airbyte.sources.base
Base class implementation for sources.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Base class implementation for sources.""" 3 4from __future__ import annotations 5 6import json 7import warnings 8from pathlib import Path 9from typing import TYPE_CHECKING, Any, Literal 10 11import yaml 12from rich import print # noqa: A004 # Allow shadowing the built-in 13from rich.syntax import Syntax 14 15from airbyte_protocol.models import ( 16 AirbyteCatalog, 17 AirbyteMessage, 18 ConfiguredAirbyteCatalog, 19 ConfiguredAirbyteStream, 20 DestinationSyncMode, 21 SyncMode, 22 Type, 23) 24 25from airbyte import exceptions as exc 26from airbyte._connector_base import ConnectorBase 27from airbyte._message_iterators import AirbyteMessageIterator 28from airbyte._util.temp_files import as_temp_files 29from airbyte.caches.util import get_default_cache 30from airbyte.datasets._lazy import LazyDataset 31from airbyte.progress import ProgressStyle, ProgressTracker 32from airbyte.records import StreamRecord, StreamRecordHandler 33from airbyte.results import ReadResult 34from airbyte.shared.catalog_providers import CatalogProvider 35from airbyte.strategies import WriteStrategy 36 37 38if TYPE_CHECKING: 39 from collections.abc import Generator, Iterable, Iterator 40 41 from airbyte_cdk import ConnectorSpecification 42 from airbyte_protocol.models import AirbyteStream 43 44 from airbyte._executors.base import Executor 45 from airbyte.caches import CacheBase 46 from airbyte.callbacks import ConfigChangeCallback 47 from airbyte.documents import Document 48 from airbyte.shared.state_providers import StateProviderBase 49 from airbyte.shared.state_writers import StateWriterBase 50 51 52class Source(ConnectorBase): # noqa: PLR0904 53 """A class representing a source that can be called.""" 54 55 connector_type = "source" 56 57 def __init__( 58 self, 59 executor: Executor, 60 name: str, 61 config: dict[str, Any] | None = None, 62 *, 63 config_change_callback: ConfigChangeCallback | None = None, 64 streams: str | list[str] | None = None, 65 validate: bool = False, 66 cursor_key_overrides: dict[str, str] | None = None, 67 primary_key_overrides: dict[str, str | list[str]] | None = None, 68 ) -> None: 69 """Initialize the source. 70 71 If config is provided, it will be validated against the spec if validate is True. 72 """ 73 self._to_be_selected_streams: list[str] | str = [] 74 """Used to hold selection criteria before catalog is known.""" 75 76 super().__init__( 77 executor=executor, 78 name=name, 79 config=config, 80 config_change_callback=config_change_callback, 81 validate=validate, 82 ) 83 self._config_dict: dict[str, Any] | None = None 84 self._last_log_messages: list[str] = [] 85 self._discovered_catalog: AirbyteCatalog | None = None 86 self._selected_stream_names: list[str] = [] 87 88 self._cursor_key_overrides: dict[str, str] = {} 89 """A mapping of lower-cased stream names to cursor key overrides.""" 90 91 self._primary_key_overrides: dict[str, list[str]] = {} 92 """A mapping of lower-cased stream names to primary key overrides.""" 93 94 if config is not None: 95 self.set_config(config, validate=validate) 96 if streams is not None: 97 self.select_streams(streams) 98 if cursor_key_overrides is not None: 99 self.set_cursor_keys(**cursor_key_overrides) 100 if primary_key_overrides is not None: 101 self.set_primary_keys(**primary_key_overrides) 102 103 def set_streams(self, streams: list[str]) -> None: 104 """Deprecated. See select_streams().""" 105 warnings.warn( 106 "The 'set_streams' method is deprecated and will be removed in a future version. " 107 "Please use the 'select_streams' method instead.", 108 DeprecationWarning, 109 stacklevel=2, 110 ) 111 self.select_streams(streams) 112 113 def set_cursor_key( 114 self, 115 stream_name: str, 116 cursor_key: str, 117 ) -> None: 118 """Set the cursor for a single stream. 119 120 Note: 121 - This does not unset previously set cursors. 122 - The cursor key must be a single field name. 123 - Not all streams support custom cursors. If a stream does not support custom cursors, 124 the override may be ignored. 125 - Stream names are case insensitive, while field names are case sensitive. 126 - Stream names are not validated by PyAirbyte. If the stream name 127 does not exist in the catalog, the override may be ignored. 128 """ 129 self._cursor_key_overrides[stream_name.lower()] = cursor_key 130 131 def set_cursor_keys( 132 self, 133 **kwargs: str, 134 ) -> None: 135 """Override the cursor key for one or more streams. 136 137 Usage: 138 source.set_cursor_keys( 139 stream1="cursor1", 140 stream2="cursor2", 141 ) 142 143 Note: 144 - This does not unset previously set cursors. 145 - The cursor key must be a single field name. 146 - Not all streams support custom cursors. If a stream does not support custom cursors, 147 the override may be ignored. 148 - Stream names are case insensitive, while field names are case sensitive. 149 - Stream names are not validated by PyAirbyte. If the stream name 150 does not exist in the catalog, the override may be ignored. 151 """ 152 self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()}) 153 154 def set_primary_key( 155 self, 156 stream_name: str, 157 primary_key: str | list[str], 158 ) -> None: 159 """Set the primary key for a single stream. 160 161 Note: 162 - This does not unset previously set primary keys. 163 - The primary key must be a single field name or a list of field names. 164 - Not all streams support overriding primary keys. If a stream does not support overriding 165 primary keys, the override may be ignored. 166 - Stream names are case insensitive, while field names are case sensitive. 167 - Stream names are not validated by PyAirbyte. If the stream name 168 does not exist in the catalog, the override may be ignored. 169 """ 170 self._primary_key_overrides[stream_name.lower()] = ( 171 primary_key if isinstance(primary_key, list) else [primary_key] 172 ) 173 174 def set_primary_keys( 175 self, 176 **kwargs: str | list[str], 177 ) -> None: 178 """Override the primary keys for one or more streams. 179 180 This does not unset previously set primary keys. 181 182 Usage: 183 source.set_primary_keys( 184 stream1="pk1", 185 stream2=["pk1", "pk2"], 186 ) 187 188 Note: 189 - This does not unset previously set primary keys. 190 - The primary key must be a single field name or a list of field names. 191 - Not all streams support overriding primary keys. If a stream does not support overriding 192 primary keys, the override may be ignored. 193 - Stream names are case insensitive, while field names are case sensitive. 194 - Stream names are not validated by PyAirbyte. If the stream name 195 does not exist in the catalog, the override may be ignored. 196 """ 197 self._primary_key_overrides.update( 198 {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()} 199 ) 200 201 def _log_warning_preselected_stream(self, streams: str | list[str]) -> None: 202 """Logs a warning message indicating stream selection which are not selected yet.""" 203 if streams == "*": 204 print( 205 "Warning: Config is not set yet. All streams will be selected after config is set." 206 ) 207 else: 208 print( 209 "Warning: Config is not set yet. " 210 f"Streams to be selected after config is set: {streams}" 211 ) 212 213 def select_all_streams(self) -> None: 214 """Select all streams. 215 216 This is a more streamlined equivalent to: 217 > source.select_streams(source.get_available_streams()). 218 """ 219 if self._config_dict is None: 220 self._to_be_selected_streams = "*" 221 self._log_warning_preselected_stream(self._to_be_selected_streams) 222 return 223 224 self._selected_stream_names = self.get_available_streams() 225 226 def select_streams(self, streams: str | list[str]) -> None: 227 """Select the stream names that should be read from the connector. 228 229 Args: 230 streams: A list of stream names to select. If set to "*", all streams will be selected. 231 232 Currently, if this is not set, all streams will be read. 233 """ 234 if self._config_dict is None: 235 self._to_be_selected_streams = streams 236 self._log_warning_preselected_stream(streams) 237 return 238 239 if streams == "*": 240 self.select_all_streams() 241 return 242 243 if isinstance(streams, str): 244 # If a single stream is provided, convert it to a one-item list 245 streams = [streams] 246 247 available_streams = self.get_available_streams() 248 for stream in streams: 249 if stream not in available_streams: 250 raise exc.AirbyteStreamNotFoundError( 251 stream_name=stream, 252 connector_name=self.name, 253 available_streams=available_streams, 254 ) 255 self._selected_stream_names = streams 256 257 def get_selected_streams(self) -> list[str]: 258 """Get the selected streams. 259 260 If no streams are selected, return an empty list. 261 """ 262 return self._selected_stream_names 263 264 def set_config( 265 self, 266 config: dict[str, Any], 267 *, 268 validate: bool = True, 269 ) -> None: 270 """Set the config for the connector. 271 272 If validate is True, raise an exception if the config fails validation. 273 274 If validate is False, validation will be deferred until check() or validate_config() 275 is called. 276 """ 277 if validate: 278 self.validate_config(config) 279 280 self._config_dict = config 281 282 if self._to_be_selected_streams: 283 self.select_streams(self._to_be_selected_streams) 284 self._to_be_selected_streams = [] 285 286 def get_config(self) -> dict[str, Any]: 287 """Get the config for the connector.""" 288 return self._config 289 290 @property 291 def _config(self) -> dict[str, Any]: 292 if self._config_dict is None: 293 raise exc.AirbyteConnectorConfigurationMissingError( 294 connector_name=self.name, 295 guidance="Provide via get_source() or set_config()", 296 ) 297 return self._config_dict 298 299 def _discover(self) -> AirbyteCatalog: 300 """Call discover on the connector. 301 302 This involves the following steps: 303 - Write the config to a temporary file 304 - execute the connector with discover --config <config_file> 305 - Listen to the messages and return the first AirbyteCatalog that comes along. 306 - Make sure the subprocess is killed when the function returns. 307 """ 308 with as_temp_files([self._config]) as [config_file]: 309 for msg in self._execute(["discover", "--config", config_file]): 310 if msg.type == Type.CATALOG and msg.catalog: 311 return msg.catalog 312 raise exc.AirbyteConnectorMissingCatalogError( 313 connector_name=self.name, 314 log_text=self._last_log_messages, 315 ) 316 317 def get_available_streams(self) -> list[str]: 318 """Get the available streams from the spec.""" 319 return [s.name for s in self.discovered_catalog.streams] 320 321 def _get_incremental_stream_names(self) -> list[str]: 322 """Get the name of streams that support incremental sync.""" 323 return [ 324 stream.name 325 for stream in self.discovered_catalog.streams 326 if SyncMode.incremental in stream.supported_sync_modes 327 ] 328 329 def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: 330 """Call spec on the connector. 331 332 This involves the following steps: 333 * execute the connector with spec 334 * Listen to the messages and return the first AirbyteCatalog that comes along. 335 * Make sure the subprocess is killed when the function returns. 336 """ 337 if force_refresh or self._spec is None: 338 for msg in self._execute(["spec"]): 339 if msg.type == Type.SPEC and msg.spec: 340 self._spec = msg.spec 341 break 342 343 if self._spec: 344 return self._spec 345 346 raise exc.AirbyteConnectorMissingSpecError( 347 connector_name=self.name, 348 log_text=self._last_log_messages, 349 ) 350 351 @property 352 def config_spec(self) -> dict[str, Any]: 353 """Generate a configuration spec for this connector, as a JSON Schema definition. 354 355 This function generates a JSON Schema dictionary with configuration specs for the 356 current connector, as a dictionary. 357 358 Returns: 359 dict: The JSON Schema configuration spec as a dictionary. 360 """ 361 return self._get_spec(force_refresh=True).connectionSpecification 362 363 def print_config_spec( 364 self, 365 format: Literal["yaml", "json"] = "yaml", # noqa: A002 366 *, 367 output_file: Path | str | None = None, 368 ) -> None: 369 """Print the configuration spec for this connector. 370 371 Args: 372 format: The format to print the spec in. Must be "yaml" or "json". 373 output_file: Optional. If set, the spec will be written to the given file path. 374 Otherwise, it will be printed to the console. 375 """ 376 if format not in {"yaml", "json"}: 377 raise exc.PyAirbyteInputError( 378 message="Invalid format. Expected 'yaml' or 'json'", 379 input_value=format, 380 ) 381 if isinstance(output_file, str): 382 output_file = Path(output_file) 383 384 if format == "yaml": 385 content = yaml.dump(self.config_spec, indent=2) 386 elif format == "json": 387 content = json.dumps(self.config_spec, indent=2) 388 389 if output_file: 390 output_file.write_text(content) 391 return 392 393 syntax_highlighted = Syntax(content, format) 394 print(syntax_highlighted) 395 396 @property 397 def _yaml_spec(self) -> str: 398 """Get the spec as a yaml string. 399 400 For now, the primary use case is for writing and debugging a valid config for a source. 401 402 This is private for now because we probably want better polish before exposing this 403 as a stable interface. This will also get easier when we have docs links with this info 404 for each connector. 405 """ 406 spec_obj: ConnectorSpecification = self._get_spec() 407 spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True) 408 # convert to a yaml string 409 return yaml.dump(spec_dict) 410 411 @property 412 def docs_url(self) -> str: 413 """Get the URL to the connector's documentation.""" 414 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 415 "source-", "" 416 ) 417 418 @property 419 def discovered_catalog(self) -> AirbyteCatalog: 420 """Get the raw catalog for the given streams. 421 422 If the catalog is not yet known, we call discover to get it. 423 """ 424 if self._discovered_catalog is None: 425 self._discovered_catalog = self._discover() 426 427 return self._discovered_catalog 428 429 @property 430 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 431 """Get the configured catalog for the given streams. 432 433 If the raw catalog is not yet known, we call discover to get it. 434 435 If no specific streams are selected, we return a catalog that syncs all available streams. 436 437 TODO: We should consider disabling by default the streams that the connector would 438 disable by default. (For instance, streams that require a premium license are sometimes 439 disabled by default within the connector.) 440 """ 441 # Ensure discovered catalog is cached before we start 442 _ = self.discovered_catalog 443 444 # Filter for selected streams if set, otherwise use all available streams: 445 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 446 return self.get_configured_catalog(streams=streams_filter) 447 448 def get_configured_catalog( 449 self, 450 streams: Literal["*"] | list[str] | None = None, 451 ) -> ConfiguredAirbyteCatalog: 452 """Get a configured catalog for the given streams. 453 454 If no streams are provided, the selected streams will be used. If no streams are selected, 455 all available streams will be used. 456 457 If '*' is provided, all available streams will be used. 458 """ 459 selected_streams: list[str] = [] 460 if streams is None: 461 selected_streams = self._selected_stream_names or self.get_available_streams() 462 elif streams == "*": 463 selected_streams = self.get_available_streams() 464 elif isinstance(streams, list): 465 selected_streams = streams 466 else: 467 raise exc.PyAirbyteInputError( 468 message="Invalid streams argument.", 469 input_value=streams, 470 ) 471 472 return ConfiguredAirbyteCatalog( 473 streams=[ 474 ConfiguredAirbyteStream( 475 stream=stream, 476 destination_sync_mode=DestinationSyncMode.overwrite, 477 sync_mode=SyncMode.incremental, 478 primary_key=( 479 [self._primary_key_overrides[stream.name.lower()]] 480 if stream.name.lower() in self._primary_key_overrides 481 else stream.source_defined_primary_key 482 ), 483 cursor_field=( 484 [self._cursor_key_overrides[stream.name.lower()]] 485 if stream.name.lower() in self._cursor_key_overrides 486 else stream.default_cursor_field 487 ), 488 # These are unused in the current implementation: 489 generation_id=None, 490 minimum_generation_id=None, 491 sync_id=None, 492 ) 493 for stream in self.discovered_catalog.streams 494 if stream.name in selected_streams 495 ], 496 ) 497 498 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 499 """Return the JSON Schema spec for the specified stream name.""" 500 catalog: AirbyteCatalog = self.discovered_catalog 501 found: list[AirbyteStream] = [ 502 stream for stream in catalog.streams if stream.name == stream_name 503 ] 504 505 if len(found) == 0: 506 raise exc.PyAirbyteInputError( 507 message="Stream name does not exist in catalog.", 508 input_value=stream_name, 509 ) 510 511 if len(found) > 1: 512 raise exc.PyAirbyteInternalError( 513 message="Duplicate streams found with the same name.", 514 context={ 515 "found_streams": found, 516 }, 517 ) 518 519 return found[0].json_schema 520 521 def get_records( 522 self, 523 stream: str, 524 *, 525 normalize_field_names: bool = False, 526 prune_undeclared_fields: bool = True, 527 ) -> LazyDataset: 528 """Read a stream from the connector. 529 530 Args: 531 stream: The name of the stream to read. 532 normalize_field_names: When `True`, field names will be normalized to lower case, with 533 special characters removed. This matches the behavior of PyAirbyte caches and most 534 Airbyte destinations. 535 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 536 which generally matches the behavior of PyAirbyte caches and most Airbyte 537 destinations, specifically when you expect the catalog may be stale. You can disable 538 this to keep all fields in the records. 539 540 This involves the following steps: 541 * Call discover to get the catalog 542 * Generate a configured catalog that syncs the given stream in full_refresh mode 543 * Write the configured catalog and the config to a temporary file 544 * execute the connector with read --config <config_file> --catalog <catalog_file> 545 * Listen to the messages and return the first AirbyteRecordMessages that come along. 546 * Make sure the subprocess is killed when the function returns. 547 """ 548 configured_catalog = self.get_configured_catalog(streams=[stream]) 549 if len(configured_catalog.streams) == 0: 550 raise exc.PyAirbyteInputError( 551 message="Requested stream does not exist.", 552 context={ 553 "stream": stream, 554 "available_streams": self.get_available_streams(), 555 "connector_name": self.name, 556 }, 557 ) from KeyError(stream) 558 559 configured_stream = configured_catalog.streams[0] 560 561 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 562 yield from records 563 564 stream_record_handler = StreamRecordHandler( 565 json_schema=self.get_stream_json_schema(stream), 566 prune_extra_fields=prune_undeclared_fields, 567 normalize_keys=normalize_field_names, 568 ) 569 570 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 571 progress_tracker = ProgressTracker( 572 ProgressStyle.PLAIN, 573 source=self, 574 cache=None, 575 destination=None, 576 expected_streams=[stream], 577 ) 578 579 iterator: Iterator[dict[str, Any]] = ( 580 StreamRecord.from_record_message( 581 record_message=record.record, 582 stream_record_handler=stream_record_handler, 583 ) 584 for record in self._read_with_catalog( 585 catalog=configured_catalog, 586 progress_tracker=progress_tracker, 587 ) 588 if record.record 589 ) 590 progress_tracker.log_success() 591 return LazyDataset( 592 iterator, 593 stream_metadata=configured_stream, 594 ) 595 596 def get_documents( 597 self, 598 stream: str, 599 title_property: str | None = None, 600 content_properties: list[str] | None = None, 601 metadata_properties: list[str] | None = None, 602 *, 603 render_metadata: bool = False, 604 ) -> Iterable[Document]: 605 """Read a stream from the connector and return the records as documents. 606 607 If metadata_properties is not set, all properties that are not content will be added to 608 the metadata. 609 610 If render_metadata is True, metadata will be rendered in the document, as well as the 611 the main content. 612 """ 613 return self.get_records(stream).to_documents( 614 title_property=title_property, 615 content_properties=content_properties, 616 metadata_properties=metadata_properties, 617 render_metadata=render_metadata, 618 ) 619 620 def _get_airbyte_message_iterator( 621 self, 622 *, 623 streams: Literal["*"] | list[str] | None = None, 624 state_provider: StateProviderBase | None = None, 625 progress_tracker: ProgressTracker, 626 force_full_refresh: bool = False, 627 ) -> AirbyteMessageIterator: 628 """Get an AirbyteMessageIterator for this source.""" 629 return AirbyteMessageIterator( 630 self._read_with_catalog( 631 catalog=self.get_configured_catalog(streams=streams), 632 state=state_provider if not force_full_refresh else None, 633 progress_tracker=progress_tracker, 634 ) 635 ) 636 637 def _read_with_catalog( 638 self, 639 catalog: ConfiguredAirbyteCatalog, 640 progress_tracker: ProgressTracker, 641 state: StateProviderBase | None = None, 642 ) -> Generator[AirbyteMessage, None, None]: 643 """Call read on the connector. 644 645 This involves the following steps: 646 * Write the config to a temporary file 647 * execute the connector with read --config <config_file> --catalog <catalog_file> 648 * Listen to the messages and return the AirbyteRecordMessages that come along. 649 * Send out telemetry on the performed sync (with information about which source was used and 650 the type of the cache) 651 """ 652 with as_temp_files( 653 [ 654 self._config, 655 catalog.model_dump_json(), 656 state.to_state_input_file_text() if state else "[]", 657 ] 658 ) as [ 659 config_file, 660 catalog_file, 661 state_file, 662 ]: 663 message_generator = self._execute( 664 [ 665 "read", 666 "--config", 667 config_file, 668 "--catalog", 669 catalog_file, 670 "--state", 671 state_file, 672 ], 673 progress_tracker=progress_tracker, 674 ) 675 yield from progress_tracker.tally_records_read(message_generator) 676 progress_tracker.log_read_complete() 677 678 def _peek_airbyte_message( 679 self, 680 message: AirbyteMessage, 681 *, 682 raise_on_error: bool = True, 683 ) -> None: 684 """Process an Airbyte message. 685 686 This method handles reading Airbyte messages and taking action, if needed, based on the 687 message type. For instance, log messages are logged, records are tallied, and errors are 688 raised as exceptions if `raise_on_error` is True. 689 690 Raises: 691 AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted. 692 """ 693 super()._peek_airbyte_message(message, raise_on_error=raise_on_error) 694 695 def _log_incremental_streams( 696 self, 697 *, 698 incremental_streams: set[str] | None = None, 699 ) -> None: 700 """Log the streams which are using incremental sync mode.""" 701 log_message = ( 702 "The following streams are currently using incremental sync:\n" 703 f"{incremental_streams}\n" 704 "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method." 705 ) 706 print(log_message) 707 708 def read( 709 self, 710 cache: CacheBase | None = None, 711 *, 712 streams: str | list[str] | None = None, 713 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 714 force_full_refresh: bool = False, 715 skip_validation: bool = False, 716 ) -> ReadResult: 717 """Read from the connector and write to the cache. 718 719 Args: 720 cache: The cache to write to. If not set, a default cache will be used. 721 streams: Optional if already set. A list of stream names to select for reading. If set 722 to "*", all streams will be selected. 723 write_strategy: The strategy to use when writing to the cache. If a string, it must be 724 one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one 725 of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or 726 WriteStrategy.AUTO. 727 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 728 streams will be read in incremental mode if supported by the connector. This option 729 must be True when using the "replace" strategy. 730 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 731 running the connector. This can be helpful in debugging, when you want to send 732 configurations to the connector that otherwise might be rejected by JSON Schema 733 validation rules. 734 """ 735 cache = cache or get_default_cache() 736 progress_tracker = ProgressTracker( 737 source=self, 738 cache=cache, 739 destination=None, 740 expected_streams=None, # Will be set later 741 ) 742 743 # Set up state provider if not in full refresh mode 744 if force_full_refresh: 745 state_provider: StateProviderBase | None = None 746 else: 747 state_provider = cache.get_state_provider( 748 source_name=self._name, 749 ) 750 state_writer = cache.get_state_writer(source_name=self._name) 751 752 if streams: 753 self.select_streams(streams) 754 755 if not self._selected_stream_names: 756 raise exc.PyAirbyteNoStreamsSelectedError( 757 connector_name=self.name, 758 available_streams=self.get_available_streams(), 759 ) 760 761 try: 762 result = self._read_to_cache( 763 cache=cache, 764 catalog_provider=CatalogProvider(self.configured_catalog), 765 stream_names=self._selected_stream_names, 766 state_provider=state_provider, 767 state_writer=state_writer, 768 write_strategy=write_strategy, 769 force_full_refresh=force_full_refresh, 770 skip_validation=skip_validation, 771 progress_tracker=progress_tracker, 772 ) 773 except exc.PyAirbyteInternalError as ex: 774 progress_tracker.log_failure(exception=ex) 775 raise exc.AirbyteConnectorFailedError( 776 connector_name=self.name, 777 log_text=self._last_log_messages, 778 ) from ex 779 except Exception as ex: 780 progress_tracker.log_failure(exception=ex) 781 raise 782 783 progress_tracker.log_success() 784 return result 785 786 def _read_to_cache( # noqa: PLR0913 # Too many arguments 787 self, 788 cache: CacheBase, 789 *, 790 catalog_provider: CatalogProvider, 791 stream_names: list[str], 792 state_provider: StateProviderBase | None, 793 state_writer: StateWriterBase | None, 794 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 795 force_full_refresh: bool = False, 796 skip_validation: bool = False, 797 progress_tracker: ProgressTracker, 798 ) -> ReadResult: 799 """Internal read method.""" 800 if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: 801 warnings.warn( 802 message=( 803 "Using `REPLACE` strategy without also setting `full_refresh_mode=True` " 804 "could result in data loss. " 805 "To silence this warning, use the following: " 806 'warnings.filterwarnings("ignore", ' 807 'category="airbyte.warnings.PyAirbyteDataLossWarning")`' 808 ), 809 category=exc.PyAirbyteDataLossWarning, 810 stacklevel=1, 811 ) 812 if isinstance(write_strategy, str): 813 try: 814 write_strategy = WriteStrategy(write_strategy) 815 except ValueError: 816 raise exc.PyAirbyteInputError( 817 message="Invalid strategy", 818 context={ 819 "write_strategy": write_strategy, 820 "available_strategies": [s.value for s in WriteStrategy], 821 }, 822 ) from None 823 824 # Run optional validation step 825 if not skip_validation: 826 self.validate_config() 827 828 # Log incremental stream if incremental streams are known 829 if state_provider and state_provider.known_stream_names: 830 # Retrieve set of the known streams support which support incremental sync 831 incremental_streams = ( 832 set(self._get_incremental_stream_names()) 833 & state_provider.known_stream_names 834 & set(self.get_selected_streams()) 835 ) 836 if incremental_streams: 837 self._log_incremental_streams(incremental_streams=incremental_streams) 838 839 airbyte_message_iterator = AirbyteMessageIterator( 840 self._read_with_catalog( 841 catalog=catalog_provider.configured_catalog, 842 state=state_provider, 843 progress_tracker=progress_tracker, 844 ) 845 ) 846 cache._write_airbyte_message_stream( # noqa: SLF001 # Non-public API 847 stdin=airbyte_message_iterator, 848 catalog_provider=catalog_provider, 849 write_strategy=write_strategy, 850 state_writer=state_writer, 851 progress_tracker=progress_tracker, 852 ) 853 854 # Flush the WAL, if applicable 855 cache.processor._do_checkpoint() # noqa: SLF001 # Non-public API 856 857 return ReadResult( 858 source_name=self.name, 859 progress_tracker=progress_tracker, 860 processed_streams=stream_names, 861 cache=cache, 862 ) 863 864 865__all__ = [ 866 "Source", 867]
53class Source(ConnectorBase): # noqa: PLR0904 54 """A class representing a source that can be called.""" 55 56 connector_type = "source" 57 58 def __init__( 59 self, 60 executor: Executor, 61 name: str, 62 config: dict[str, Any] | None = None, 63 *, 64 config_change_callback: ConfigChangeCallback | None = None, 65 streams: str | list[str] | None = None, 66 validate: bool = False, 67 cursor_key_overrides: dict[str, str] | None = None, 68 primary_key_overrides: dict[str, str | list[str]] | None = None, 69 ) -> None: 70 """Initialize the source. 71 72 If config is provided, it will be validated against the spec if validate is True. 73 """ 74 self._to_be_selected_streams: list[str] | str = [] 75 """Used to hold selection criteria before catalog is known.""" 76 77 super().__init__( 78 executor=executor, 79 name=name, 80 config=config, 81 config_change_callback=config_change_callback, 82 validate=validate, 83 ) 84 self._config_dict: dict[str, Any] | None = None 85 self._last_log_messages: list[str] = [] 86 self._discovered_catalog: AirbyteCatalog | None = None 87 self._selected_stream_names: list[str] = [] 88 89 self._cursor_key_overrides: dict[str, str] = {} 90 """A mapping of lower-cased stream names to cursor key overrides.""" 91 92 self._primary_key_overrides: dict[str, list[str]] = {} 93 """A mapping of lower-cased stream names to primary key overrides.""" 94 95 if config is not None: 96 self.set_config(config, validate=validate) 97 if streams is not None: 98 self.select_streams(streams) 99 if cursor_key_overrides is not None: 100 self.set_cursor_keys(**cursor_key_overrides) 101 if primary_key_overrides is not None: 102 self.set_primary_keys(**primary_key_overrides) 103 104 def set_streams(self, streams: list[str]) -> None: 105 """Deprecated. See select_streams().""" 106 warnings.warn( 107 "The 'set_streams' method is deprecated and will be removed in a future version. " 108 "Please use the 'select_streams' method instead.", 109 DeprecationWarning, 110 stacklevel=2, 111 ) 112 self.select_streams(streams) 113 114 def set_cursor_key( 115 self, 116 stream_name: str, 117 cursor_key: str, 118 ) -> None: 119 """Set the cursor for a single stream. 120 121 Note: 122 - This does not unset previously set cursors. 123 - The cursor key must be a single field name. 124 - Not all streams support custom cursors. If a stream does not support custom cursors, 125 the override may be ignored. 126 - Stream names are case insensitive, while field names are case sensitive. 127 - Stream names are not validated by PyAirbyte. If the stream name 128 does not exist in the catalog, the override may be ignored. 129 """ 130 self._cursor_key_overrides[stream_name.lower()] = cursor_key 131 132 def set_cursor_keys( 133 self, 134 **kwargs: str, 135 ) -> None: 136 """Override the cursor key for one or more streams. 137 138 Usage: 139 source.set_cursor_keys( 140 stream1="cursor1", 141 stream2="cursor2", 142 ) 143 144 Note: 145 - This does not unset previously set cursors. 146 - The cursor key must be a single field name. 147 - Not all streams support custom cursors. If a stream does not support custom cursors, 148 the override may be ignored. 149 - Stream names are case insensitive, while field names are case sensitive. 150 - Stream names are not validated by PyAirbyte. If the stream name 151 does not exist in the catalog, the override may be ignored. 152 """ 153 self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()}) 154 155 def set_primary_key( 156 self, 157 stream_name: str, 158 primary_key: str | list[str], 159 ) -> None: 160 """Set the primary key for a single stream. 161 162 Note: 163 - This does not unset previously set primary keys. 164 - The primary key must be a single field name or a list of field names. 165 - Not all streams support overriding primary keys. If a stream does not support overriding 166 primary keys, the override may be ignored. 167 - Stream names are case insensitive, while field names are case sensitive. 168 - Stream names are not validated by PyAirbyte. If the stream name 169 does not exist in the catalog, the override may be ignored. 170 """ 171 self._primary_key_overrides[stream_name.lower()] = ( 172 primary_key if isinstance(primary_key, list) else [primary_key] 173 ) 174 175 def set_primary_keys( 176 self, 177 **kwargs: str | list[str], 178 ) -> None: 179 """Override the primary keys for one or more streams. 180 181 This does not unset previously set primary keys. 182 183 Usage: 184 source.set_primary_keys( 185 stream1="pk1", 186 stream2=["pk1", "pk2"], 187 ) 188 189 Note: 190 - This does not unset previously set primary keys. 191 - The primary key must be a single field name or a list of field names. 192 - Not all streams support overriding primary keys. If a stream does not support overriding 193 primary keys, the override may be ignored. 194 - Stream names are case insensitive, while field names are case sensitive. 195 - Stream names are not validated by PyAirbyte. If the stream name 196 does not exist in the catalog, the override may be ignored. 197 """ 198 self._primary_key_overrides.update( 199 {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()} 200 ) 201 202 def _log_warning_preselected_stream(self, streams: str | list[str]) -> None: 203 """Logs a warning message indicating stream selection which are not selected yet.""" 204 if streams == "*": 205 print( 206 "Warning: Config is not set yet. All streams will be selected after config is set." 207 ) 208 else: 209 print( 210 "Warning: Config is not set yet. " 211 f"Streams to be selected after config is set: {streams}" 212 ) 213 214 def select_all_streams(self) -> None: 215 """Select all streams. 216 217 This is a more streamlined equivalent to: 218 > source.select_streams(source.get_available_streams()). 219 """ 220 if self._config_dict is None: 221 self._to_be_selected_streams = "*" 222 self._log_warning_preselected_stream(self._to_be_selected_streams) 223 return 224 225 self._selected_stream_names = self.get_available_streams() 226 227 def select_streams(self, streams: str | list[str]) -> None: 228 """Select the stream names that should be read from the connector. 229 230 Args: 231 streams: A list of stream names to select. If set to "*", all streams will be selected. 232 233 Currently, if this is not set, all streams will be read. 234 """ 235 if self._config_dict is None: 236 self._to_be_selected_streams = streams 237 self._log_warning_preselected_stream(streams) 238 return 239 240 if streams == "*": 241 self.select_all_streams() 242 return 243 244 if isinstance(streams, str): 245 # If a single stream is provided, convert it to a one-item list 246 streams = [streams] 247 248 available_streams = self.get_available_streams() 249 for stream in streams: 250 if stream not in available_streams: 251 raise exc.AirbyteStreamNotFoundError( 252 stream_name=stream, 253 connector_name=self.name, 254 available_streams=available_streams, 255 ) 256 self._selected_stream_names = streams 257 258 def get_selected_streams(self) -> list[str]: 259 """Get the selected streams. 260 261 If no streams are selected, return an empty list. 262 """ 263 return self._selected_stream_names 264 265 def set_config( 266 self, 267 config: dict[str, Any], 268 *, 269 validate: bool = True, 270 ) -> None: 271 """Set the config for the connector. 272 273 If validate is True, raise an exception if the config fails validation. 274 275 If validate is False, validation will be deferred until check() or validate_config() 276 is called. 277 """ 278 if validate: 279 self.validate_config(config) 280 281 self._config_dict = config 282 283 if self._to_be_selected_streams: 284 self.select_streams(self._to_be_selected_streams) 285 self._to_be_selected_streams = [] 286 287 def get_config(self) -> dict[str, Any]: 288 """Get the config for the connector.""" 289 return self._config 290 291 @property 292 def _config(self) -> dict[str, Any]: 293 if self._config_dict is None: 294 raise exc.AirbyteConnectorConfigurationMissingError( 295 connector_name=self.name, 296 guidance="Provide via get_source() or set_config()", 297 ) 298 return self._config_dict 299 300 def _discover(self) -> AirbyteCatalog: 301 """Call discover on the connector. 302 303 This involves the following steps: 304 - Write the config to a temporary file 305 - execute the connector with discover --config <config_file> 306 - Listen to the messages and return the first AirbyteCatalog that comes along. 307 - Make sure the subprocess is killed when the function returns. 308 """ 309 with as_temp_files([self._config]) as [config_file]: 310 for msg in self._execute(["discover", "--config", config_file]): 311 if msg.type == Type.CATALOG and msg.catalog: 312 return msg.catalog 313 raise exc.AirbyteConnectorMissingCatalogError( 314 connector_name=self.name, 315 log_text=self._last_log_messages, 316 ) 317 318 def get_available_streams(self) -> list[str]: 319 """Get the available streams from the spec.""" 320 return [s.name for s in self.discovered_catalog.streams] 321 322 def _get_incremental_stream_names(self) -> list[str]: 323 """Get the name of streams that support incremental sync.""" 324 return [ 325 stream.name 326 for stream in self.discovered_catalog.streams 327 if SyncMode.incremental in stream.supported_sync_modes 328 ] 329 330 def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: 331 """Call spec on the connector. 332 333 This involves the following steps: 334 * execute the connector with spec 335 * Listen to the messages and return the first AirbyteCatalog that comes along. 336 * Make sure the subprocess is killed when the function returns. 337 """ 338 if force_refresh or self._spec is None: 339 for msg in self._execute(["spec"]): 340 if msg.type == Type.SPEC and msg.spec: 341 self._spec = msg.spec 342 break 343 344 if self._spec: 345 return self._spec 346 347 raise exc.AirbyteConnectorMissingSpecError( 348 connector_name=self.name, 349 log_text=self._last_log_messages, 350 ) 351 352 @property 353 def config_spec(self) -> dict[str, Any]: 354 """Generate a configuration spec for this connector, as a JSON Schema definition. 355 356 This function generates a JSON Schema dictionary with configuration specs for the 357 current connector, as a dictionary. 358 359 Returns: 360 dict: The JSON Schema configuration spec as a dictionary. 361 """ 362 return self._get_spec(force_refresh=True).connectionSpecification 363 364 def print_config_spec( 365 self, 366 format: Literal["yaml", "json"] = "yaml", # noqa: A002 367 *, 368 output_file: Path | str | None = None, 369 ) -> None: 370 """Print the configuration spec for this connector. 371 372 Args: 373 format: The format to print the spec in. Must be "yaml" or "json". 374 output_file: Optional. If set, the spec will be written to the given file path. 375 Otherwise, it will be printed to the console. 376 """ 377 if format not in {"yaml", "json"}: 378 raise exc.PyAirbyteInputError( 379 message="Invalid format. Expected 'yaml' or 'json'", 380 input_value=format, 381 ) 382 if isinstance(output_file, str): 383 output_file = Path(output_file) 384 385 if format == "yaml": 386 content = yaml.dump(self.config_spec, indent=2) 387 elif format == "json": 388 content = json.dumps(self.config_spec, indent=2) 389 390 if output_file: 391 output_file.write_text(content) 392 return 393 394 syntax_highlighted = Syntax(content, format) 395 print(syntax_highlighted) 396 397 @property 398 def _yaml_spec(self) -> str: 399 """Get the spec as a yaml string. 400 401 For now, the primary use case is for writing and debugging a valid config for a source. 402 403 This is private for now because we probably want better polish before exposing this 404 as a stable interface. This will also get easier when we have docs links with this info 405 for each connector. 406 """ 407 spec_obj: ConnectorSpecification = self._get_spec() 408 spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True) 409 # convert to a yaml string 410 return yaml.dump(spec_dict) 411 412 @property 413 def docs_url(self) -> str: 414 """Get the URL to the connector's documentation.""" 415 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 416 "source-", "" 417 ) 418 419 @property 420 def discovered_catalog(self) -> AirbyteCatalog: 421 """Get the raw catalog for the given streams. 422 423 If the catalog is not yet known, we call discover to get it. 424 """ 425 if self._discovered_catalog is None: 426 self._discovered_catalog = self._discover() 427 428 return self._discovered_catalog 429 430 @property 431 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 432 """Get the configured catalog for the given streams. 433 434 If the raw catalog is not yet known, we call discover to get it. 435 436 If no specific streams are selected, we return a catalog that syncs all available streams. 437 438 TODO: We should consider disabling by default the streams that the connector would 439 disable by default. (For instance, streams that require a premium license are sometimes 440 disabled by default within the connector.) 441 """ 442 # Ensure discovered catalog is cached before we start 443 _ = self.discovered_catalog 444 445 # Filter for selected streams if set, otherwise use all available streams: 446 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 447 return self.get_configured_catalog(streams=streams_filter) 448 449 def get_configured_catalog( 450 self, 451 streams: Literal["*"] | list[str] | None = None, 452 ) -> ConfiguredAirbyteCatalog: 453 """Get a configured catalog for the given streams. 454 455 If no streams are provided, the selected streams will be used. If no streams are selected, 456 all available streams will be used. 457 458 If '*' is provided, all available streams will be used. 459 """ 460 selected_streams: list[str] = [] 461 if streams is None: 462 selected_streams = self._selected_stream_names or self.get_available_streams() 463 elif streams == "*": 464 selected_streams = self.get_available_streams() 465 elif isinstance(streams, list): 466 selected_streams = streams 467 else: 468 raise exc.PyAirbyteInputError( 469 message="Invalid streams argument.", 470 input_value=streams, 471 ) 472 473 return ConfiguredAirbyteCatalog( 474 streams=[ 475 ConfiguredAirbyteStream( 476 stream=stream, 477 destination_sync_mode=DestinationSyncMode.overwrite, 478 sync_mode=SyncMode.incremental, 479 primary_key=( 480 [self._primary_key_overrides[stream.name.lower()]] 481 if stream.name.lower() in self._primary_key_overrides 482 else stream.source_defined_primary_key 483 ), 484 cursor_field=( 485 [self._cursor_key_overrides[stream.name.lower()]] 486 if stream.name.lower() in self._cursor_key_overrides 487 else stream.default_cursor_field 488 ), 489 # These are unused in the current implementation: 490 generation_id=None, 491 minimum_generation_id=None, 492 sync_id=None, 493 ) 494 for stream in self.discovered_catalog.streams 495 if stream.name in selected_streams 496 ], 497 ) 498 499 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 500 """Return the JSON Schema spec for the specified stream name.""" 501 catalog: AirbyteCatalog = self.discovered_catalog 502 found: list[AirbyteStream] = [ 503 stream for stream in catalog.streams if stream.name == stream_name 504 ] 505 506 if len(found) == 0: 507 raise exc.PyAirbyteInputError( 508 message="Stream name does not exist in catalog.", 509 input_value=stream_name, 510 ) 511 512 if len(found) > 1: 513 raise exc.PyAirbyteInternalError( 514 message="Duplicate streams found with the same name.", 515 context={ 516 "found_streams": found, 517 }, 518 ) 519 520 return found[0].json_schema 521 522 def get_records( 523 self, 524 stream: str, 525 *, 526 normalize_field_names: bool = False, 527 prune_undeclared_fields: bool = True, 528 ) -> LazyDataset: 529 """Read a stream from the connector. 530 531 Args: 532 stream: The name of the stream to read. 533 normalize_field_names: When `True`, field names will be normalized to lower case, with 534 special characters removed. This matches the behavior of PyAirbyte caches and most 535 Airbyte destinations. 536 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 537 which generally matches the behavior of PyAirbyte caches and most Airbyte 538 destinations, specifically when you expect the catalog may be stale. You can disable 539 this to keep all fields in the records. 540 541 This involves the following steps: 542 * Call discover to get the catalog 543 * Generate a configured catalog that syncs the given stream in full_refresh mode 544 * Write the configured catalog and the config to a temporary file 545 * execute the connector with read --config <config_file> --catalog <catalog_file> 546 * Listen to the messages and return the first AirbyteRecordMessages that come along. 547 * Make sure the subprocess is killed when the function returns. 548 """ 549 configured_catalog = self.get_configured_catalog(streams=[stream]) 550 if len(configured_catalog.streams) == 0: 551 raise exc.PyAirbyteInputError( 552 message="Requested stream does not exist.", 553 context={ 554 "stream": stream, 555 "available_streams": self.get_available_streams(), 556 "connector_name": self.name, 557 }, 558 ) from KeyError(stream) 559 560 configured_stream = configured_catalog.streams[0] 561 562 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 563 yield from records 564 565 stream_record_handler = StreamRecordHandler( 566 json_schema=self.get_stream_json_schema(stream), 567 prune_extra_fields=prune_undeclared_fields, 568 normalize_keys=normalize_field_names, 569 ) 570 571 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 572 progress_tracker = ProgressTracker( 573 ProgressStyle.PLAIN, 574 source=self, 575 cache=None, 576 destination=None, 577 expected_streams=[stream], 578 ) 579 580 iterator: Iterator[dict[str, Any]] = ( 581 StreamRecord.from_record_message( 582 record_message=record.record, 583 stream_record_handler=stream_record_handler, 584 ) 585 for record in self._read_with_catalog( 586 catalog=configured_catalog, 587 progress_tracker=progress_tracker, 588 ) 589 if record.record 590 ) 591 progress_tracker.log_success() 592 return LazyDataset( 593 iterator, 594 stream_metadata=configured_stream, 595 ) 596 597 def get_documents( 598 self, 599 stream: str, 600 title_property: str | None = None, 601 content_properties: list[str] | None = None, 602 metadata_properties: list[str] | None = None, 603 *, 604 render_metadata: bool = False, 605 ) -> Iterable[Document]: 606 """Read a stream from the connector and return the records as documents. 607 608 If metadata_properties is not set, all properties that are not content will be added to 609 the metadata. 610 611 If render_metadata is True, metadata will be rendered in the document, as well as the 612 the main content. 613 """ 614 return self.get_records(stream).to_documents( 615 title_property=title_property, 616 content_properties=content_properties, 617 metadata_properties=metadata_properties, 618 render_metadata=render_metadata, 619 ) 620 621 def _get_airbyte_message_iterator( 622 self, 623 *, 624 streams: Literal["*"] | list[str] | None = None, 625 state_provider: StateProviderBase | None = None, 626 progress_tracker: ProgressTracker, 627 force_full_refresh: bool = False, 628 ) -> AirbyteMessageIterator: 629 """Get an AirbyteMessageIterator for this source.""" 630 return AirbyteMessageIterator( 631 self._read_with_catalog( 632 catalog=self.get_configured_catalog(streams=streams), 633 state=state_provider if not force_full_refresh else None, 634 progress_tracker=progress_tracker, 635 ) 636 ) 637 638 def _read_with_catalog( 639 self, 640 catalog: ConfiguredAirbyteCatalog, 641 progress_tracker: ProgressTracker, 642 state: StateProviderBase | None = None, 643 ) -> Generator[AirbyteMessage, None, None]: 644 """Call read on the connector. 645 646 This involves the following steps: 647 * Write the config to a temporary file 648 * execute the connector with read --config <config_file> --catalog <catalog_file> 649 * Listen to the messages and return the AirbyteRecordMessages that come along. 650 * Send out telemetry on the performed sync (with information about which source was used and 651 the type of the cache) 652 """ 653 with as_temp_files( 654 [ 655 self._config, 656 catalog.model_dump_json(), 657 state.to_state_input_file_text() if state else "[]", 658 ] 659 ) as [ 660 config_file, 661 catalog_file, 662 state_file, 663 ]: 664 message_generator = self._execute( 665 [ 666 "read", 667 "--config", 668 config_file, 669 "--catalog", 670 catalog_file, 671 "--state", 672 state_file, 673 ], 674 progress_tracker=progress_tracker, 675 ) 676 yield from progress_tracker.tally_records_read(message_generator) 677 progress_tracker.log_read_complete() 678 679 def _peek_airbyte_message( 680 self, 681 message: AirbyteMessage, 682 *, 683 raise_on_error: bool = True, 684 ) -> None: 685 """Process an Airbyte message. 686 687 This method handles reading Airbyte messages and taking action, if needed, based on the 688 message type. For instance, log messages are logged, records are tallied, and errors are 689 raised as exceptions if `raise_on_error` is True. 690 691 Raises: 692 AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted. 693 """ 694 super()._peek_airbyte_message(message, raise_on_error=raise_on_error) 695 696 def _log_incremental_streams( 697 self, 698 *, 699 incremental_streams: set[str] | None = None, 700 ) -> None: 701 """Log the streams which are using incremental sync mode.""" 702 log_message = ( 703 "The following streams are currently using incremental sync:\n" 704 f"{incremental_streams}\n" 705 "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method." 706 ) 707 print(log_message) 708 709 def read( 710 self, 711 cache: CacheBase | None = None, 712 *, 713 streams: str | list[str] | None = None, 714 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 715 force_full_refresh: bool = False, 716 skip_validation: bool = False, 717 ) -> ReadResult: 718 """Read from the connector and write to the cache. 719 720 Args: 721 cache: The cache to write to. If not set, a default cache will be used. 722 streams: Optional if already set. A list of stream names to select for reading. If set 723 to "*", all streams will be selected. 724 write_strategy: The strategy to use when writing to the cache. If a string, it must be 725 one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one 726 of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or 727 WriteStrategy.AUTO. 728 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 729 streams will be read in incremental mode if supported by the connector. This option 730 must be True when using the "replace" strategy. 731 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 732 running the connector. This can be helpful in debugging, when you want to send 733 configurations to the connector that otherwise might be rejected by JSON Schema 734 validation rules. 735 """ 736 cache = cache or get_default_cache() 737 progress_tracker = ProgressTracker( 738 source=self, 739 cache=cache, 740 destination=None, 741 expected_streams=None, # Will be set later 742 ) 743 744 # Set up state provider if not in full refresh mode 745 if force_full_refresh: 746 state_provider: StateProviderBase | None = None 747 else: 748 state_provider = cache.get_state_provider( 749 source_name=self._name, 750 ) 751 state_writer = cache.get_state_writer(source_name=self._name) 752 753 if streams: 754 self.select_streams(streams) 755 756 if not self._selected_stream_names: 757 raise exc.PyAirbyteNoStreamsSelectedError( 758 connector_name=self.name, 759 available_streams=self.get_available_streams(), 760 ) 761 762 try: 763 result = self._read_to_cache( 764 cache=cache, 765 catalog_provider=CatalogProvider(self.configured_catalog), 766 stream_names=self._selected_stream_names, 767 state_provider=state_provider, 768 state_writer=state_writer, 769 write_strategy=write_strategy, 770 force_full_refresh=force_full_refresh, 771 skip_validation=skip_validation, 772 progress_tracker=progress_tracker, 773 ) 774 except exc.PyAirbyteInternalError as ex: 775 progress_tracker.log_failure(exception=ex) 776 raise exc.AirbyteConnectorFailedError( 777 connector_name=self.name, 778 log_text=self._last_log_messages, 779 ) from ex 780 except Exception as ex: 781 progress_tracker.log_failure(exception=ex) 782 raise 783 784 progress_tracker.log_success() 785 return result 786 787 def _read_to_cache( # noqa: PLR0913 # Too many arguments 788 self, 789 cache: CacheBase, 790 *, 791 catalog_provider: CatalogProvider, 792 stream_names: list[str], 793 state_provider: StateProviderBase | None, 794 state_writer: StateWriterBase | None, 795 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 796 force_full_refresh: bool = False, 797 skip_validation: bool = False, 798 progress_tracker: ProgressTracker, 799 ) -> ReadResult: 800 """Internal read method.""" 801 if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: 802 warnings.warn( 803 message=( 804 "Using `REPLACE` strategy without also setting `full_refresh_mode=True` " 805 "could result in data loss. " 806 "To silence this warning, use the following: " 807 'warnings.filterwarnings("ignore", ' 808 'category="airbyte.warnings.PyAirbyteDataLossWarning")`' 809 ), 810 category=exc.PyAirbyteDataLossWarning, 811 stacklevel=1, 812 ) 813 if isinstance(write_strategy, str): 814 try: 815 write_strategy = WriteStrategy(write_strategy) 816 except ValueError: 817 raise exc.PyAirbyteInputError( 818 message="Invalid strategy", 819 context={ 820 "write_strategy": write_strategy, 821 "available_strategies": [s.value for s in WriteStrategy], 822 }, 823 ) from None 824 825 # Run optional validation step 826 if not skip_validation: 827 self.validate_config() 828 829 # Log incremental stream if incremental streams are known 830 if state_provider and state_provider.known_stream_names: 831 # Retrieve set of the known streams support which support incremental sync 832 incremental_streams = ( 833 set(self._get_incremental_stream_names()) 834 & state_provider.known_stream_names 835 & set(self.get_selected_streams()) 836 ) 837 if incremental_streams: 838 self._log_incremental_streams(incremental_streams=incremental_streams) 839 840 airbyte_message_iterator = AirbyteMessageIterator( 841 self._read_with_catalog( 842 catalog=catalog_provider.configured_catalog, 843 state=state_provider, 844 progress_tracker=progress_tracker, 845 ) 846 ) 847 cache._write_airbyte_message_stream( # noqa: SLF001 # Non-public API 848 stdin=airbyte_message_iterator, 849 catalog_provider=catalog_provider, 850 write_strategy=write_strategy, 851 state_writer=state_writer, 852 progress_tracker=progress_tracker, 853 ) 854 855 # Flush the WAL, if applicable 856 cache.processor._do_checkpoint() # noqa: SLF001 # Non-public API 857 858 return ReadResult( 859 source_name=self.name, 860 progress_tracker=progress_tracker, 861 processed_streams=stream_names, 862 cache=cache, 863 )
A class representing a source that can be called.
58 def __init__( 59 self, 60 executor: Executor, 61 name: str, 62 config: dict[str, Any] | None = None, 63 *, 64 config_change_callback: ConfigChangeCallback | None = None, 65 streams: str | list[str] | None = None, 66 validate: bool = False, 67 cursor_key_overrides: dict[str, str] | None = None, 68 primary_key_overrides: dict[str, str | list[str]] | None = None, 69 ) -> None: 70 """Initialize the source. 71 72 If config is provided, it will be validated against the spec if validate is True. 73 """ 74 self._to_be_selected_streams: list[str] | str = [] 75 """Used to hold selection criteria before catalog is known.""" 76 77 super().__init__( 78 executor=executor, 79 name=name, 80 config=config, 81 config_change_callback=config_change_callback, 82 validate=validate, 83 ) 84 self._config_dict: dict[str, Any] | None = None 85 self._last_log_messages: list[str] = [] 86 self._discovered_catalog: AirbyteCatalog | None = None 87 self._selected_stream_names: list[str] = [] 88 89 self._cursor_key_overrides: dict[str, str] = {} 90 """A mapping of lower-cased stream names to cursor key overrides.""" 91 92 self._primary_key_overrides: dict[str, list[str]] = {} 93 """A mapping of lower-cased stream names to primary key overrides.""" 94 95 if config is not None: 96 self.set_config(config, validate=validate) 97 if streams is not None: 98 self.select_streams(streams) 99 if cursor_key_overrides is not None: 100 self.set_cursor_keys(**cursor_key_overrides) 101 if primary_key_overrides is not None: 102 self.set_primary_keys(**primary_key_overrides)
Initialize the source.
If config is provided, it will be validated against the spec if validate is True.
104 def set_streams(self, streams: list[str]) -> None: 105 """Deprecated. See select_streams().""" 106 warnings.warn( 107 "The 'set_streams' method is deprecated and will be removed in a future version. " 108 "Please use the 'select_streams' method instead.", 109 DeprecationWarning, 110 stacklevel=2, 111 ) 112 self.select_streams(streams)
Deprecated. See select_streams().
114 def set_cursor_key( 115 self, 116 stream_name: str, 117 cursor_key: str, 118 ) -> None: 119 """Set the cursor for a single stream. 120 121 Note: 122 - This does not unset previously set cursors. 123 - The cursor key must be a single field name. 124 - Not all streams support custom cursors. If a stream does not support custom cursors, 125 the override may be ignored. 126 - Stream names are case insensitive, while field names are case sensitive. 127 - Stream names are not validated by PyAirbyte. If the stream name 128 does not exist in the catalog, the override may be ignored. 129 """ 130 self._cursor_key_overrides[stream_name.lower()] = cursor_key
Set the cursor for a single stream.
Note:
- This does not unset previously set cursors.
- The cursor key must be a single field name.
- Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
132 def set_cursor_keys( 133 self, 134 **kwargs: str, 135 ) -> None: 136 """Override the cursor key for one or more streams. 137 138 Usage: 139 source.set_cursor_keys( 140 stream1="cursor1", 141 stream2="cursor2", 142 ) 143 144 Note: 145 - This does not unset previously set cursors. 146 - The cursor key must be a single field name. 147 - Not all streams support custom cursors. If a stream does not support custom cursors, 148 the override may be ignored. 149 - Stream names are case insensitive, while field names are case sensitive. 150 - Stream names are not validated by PyAirbyte. If the stream name 151 does not exist in the catalog, the override may be ignored. 152 """ 153 self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})
Override the cursor key for one or more streams.
Usage:
source.set_cursor_keys( stream1="cursor1", stream2="cursor2", )
Note:
- This does not unset previously set cursors.
- The cursor key must be a single field name.
- Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
155 def set_primary_key( 156 self, 157 stream_name: str, 158 primary_key: str | list[str], 159 ) -> None: 160 """Set the primary key for a single stream. 161 162 Note: 163 - This does not unset previously set primary keys. 164 - The primary key must be a single field name or a list of field names. 165 - Not all streams support overriding primary keys. If a stream does not support overriding 166 primary keys, the override may be ignored. 167 - Stream names are case insensitive, while field names are case sensitive. 168 - Stream names are not validated by PyAirbyte. If the stream name 169 does not exist in the catalog, the override may be ignored. 170 """ 171 self._primary_key_overrides[stream_name.lower()] = ( 172 primary_key if isinstance(primary_key, list) else [primary_key] 173 )
Set the primary key for a single stream.
Note:
- This does not unset previously set primary keys.
- The primary key must be a single field name or a list of field names.
- Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
175 def set_primary_keys( 176 self, 177 **kwargs: str | list[str], 178 ) -> None: 179 """Override the primary keys for one or more streams. 180 181 This does not unset previously set primary keys. 182 183 Usage: 184 source.set_primary_keys( 185 stream1="pk1", 186 stream2=["pk1", "pk2"], 187 ) 188 189 Note: 190 - This does not unset previously set primary keys. 191 - The primary key must be a single field name or a list of field names. 192 - Not all streams support overriding primary keys. If a stream does not support overriding 193 primary keys, the override may be ignored. 194 - Stream names are case insensitive, while field names are case sensitive. 195 - Stream names are not validated by PyAirbyte. If the stream name 196 does not exist in the catalog, the override may be ignored. 197 """ 198 self._primary_key_overrides.update( 199 {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()} 200 )
Override the primary keys for one or more streams.
This does not unset previously set primary keys.
Usage:
source.set_primary_keys( stream1="pk1", stream2=["pk1", "pk2"], )
Note:
- This does not unset previously set primary keys.
- The primary key must be a single field name or a list of field names.
- Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
214 def select_all_streams(self) -> None: 215 """Select all streams. 216 217 This is a more streamlined equivalent to: 218 > source.select_streams(source.get_available_streams()). 219 """ 220 if self._config_dict is None: 221 self._to_be_selected_streams = "*" 222 self._log_warning_preselected_stream(self._to_be_selected_streams) 223 return 224 225 self._selected_stream_names = self.get_available_streams()
Select all streams.
This is a more streamlined equivalent to:
source.select_streams(source.get_available_streams()).
227 def select_streams(self, streams: str | list[str]) -> None: 228 """Select the stream names that should be read from the connector. 229 230 Args: 231 streams: A list of stream names to select. If set to "*", all streams will be selected. 232 233 Currently, if this is not set, all streams will be read. 234 """ 235 if self._config_dict is None: 236 self._to_be_selected_streams = streams 237 self._log_warning_preselected_stream(streams) 238 return 239 240 if streams == "*": 241 self.select_all_streams() 242 return 243 244 if isinstance(streams, str): 245 # If a single stream is provided, convert it to a one-item list 246 streams = [streams] 247 248 available_streams = self.get_available_streams() 249 for stream in streams: 250 if stream not in available_streams: 251 raise exc.AirbyteStreamNotFoundError( 252 stream_name=stream, 253 connector_name=self.name, 254 available_streams=available_streams, 255 ) 256 self._selected_stream_names = streams
Select the stream names that should be read from the connector.
Arguments:
- streams: A list of stream names to select. If set to "*", all streams will be selected.
Currently, if this is not set, all streams will be read.
258 def get_selected_streams(self) -> list[str]: 259 """Get the selected streams. 260 261 If no streams are selected, return an empty list. 262 """ 263 return self._selected_stream_names
Get the selected streams.
If no streams are selected, return an empty list.
265 def set_config( 266 self, 267 config: dict[str, Any], 268 *, 269 validate: bool = True, 270 ) -> None: 271 """Set the config for the connector. 272 273 If validate is True, raise an exception if the config fails validation. 274 275 If validate is False, validation will be deferred until check() or validate_config() 276 is called. 277 """ 278 if validate: 279 self.validate_config(config) 280 281 self._config_dict = config 282 283 if self._to_be_selected_streams: 284 self.select_streams(self._to_be_selected_streams) 285 self._to_be_selected_streams = []
Set the config for the connector.
If validate is True, raise an exception if the config fails validation.
If validate is False, validation will be deferred until check() or validate_config() is called.
287 def get_config(self) -> dict[str, Any]: 288 """Get the config for the connector.""" 289 return self._config
Get the config for the connector.
318 def get_available_streams(self) -> list[str]: 319 """Get the available streams from the spec.""" 320 return [s.name for s in self.discovered_catalog.streams]
Get the available streams from the spec.
352 @property 353 def config_spec(self) -> dict[str, Any]: 354 """Generate a configuration spec for this connector, as a JSON Schema definition. 355 356 This function generates a JSON Schema dictionary with configuration specs for the 357 current connector, as a dictionary. 358 359 Returns: 360 dict: The JSON Schema configuration spec as a dictionary. 361 """ 362 return self._get_spec(force_refresh=True).connectionSpecification
Generate a configuration spec for this connector, as a JSON Schema definition.
This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.
Returns:
dict: The JSON Schema configuration spec as a dictionary.
364 def print_config_spec( 365 self, 366 format: Literal["yaml", "json"] = "yaml", # noqa: A002 367 *, 368 output_file: Path | str | None = None, 369 ) -> None: 370 """Print the configuration spec for this connector. 371 372 Args: 373 format: The format to print the spec in. Must be "yaml" or "json". 374 output_file: Optional. If set, the spec will be written to the given file path. 375 Otherwise, it will be printed to the console. 376 """ 377 if format not in {"yaml", "json"}: 378 raise exc.PyAirbyteInputError( 379 message="Invalid format. Expected 'yaml' or 'json'", 380 input_value=format, 381 ) 382 if isinstance(output_file, str): 383 output_file = Path(output_file) 384 385 if format == "yaml": 386 content = yaml.dump(self.config_spec, indent=2) 387 elif format == "json": 388 content = json.dumps(self.config_spec, indent=2) 389 390 if output_file: 391 output_file.write_text(content) 392 return 393 394 syntax_highlighted = Syntax(content, format) 395 print(syntax_highlighted)
Print the configuration spec for this connector.
Arguments:
- format: The format to print the spec in. Must be "yaml" or "json".
- output_file: Optional. If set, the spec will be written to the given file path. Otherwise, it will be printed to the console.
412 @property 413 def docs_url(self) -> str: 414 """Get the URL to the connector's documentation.""" 415 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 416 "source-", "" 417 )
Get the URL to the connector's documentation.
419 @property 420 def discovered_catalog(self) -> AirbyteCatalog: 421 """Get the raw catalog for the given streams. 422 423 If the catalog is not yet known, we call discover to get it. 424 """ 425 if self._discovered_catalog is None: 426 self._discovered_catalog = self._discover() 427 428 return self._discovered_catalog
Get the raw catalog for the given streams.
If the catalog is not yet known, we call discover to get it.
430 @property 431 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 432 """Get the configured catalog for the given streams. 433 434 If the raw catalog is not yet known, we call discover to get it. 435 436 If no specific streams are selected, we return a catalog that syncs all available streams. 437 438 TODO: We should consider disabling by default the streams that the connector would 439 disable by default. (For instance, streams that require a premium license are sometimes 440 disabled by default within the connector.) 441 """ 442 # Ensure discovered catalog is cached before we start 443 _ = self.discovered_catalog 444 445 # Filter for selected streams if set, otherwise use all available streams: 446 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 447 return self.get_configured_catalog(streams=streams_filter)
Get the configured catalog for the given streams.
If the raw catalog is not yet known, we call discover to get it.
If no specific streams are selected, we return a catalog that syncs all available streams.
TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)
449 def get_configured_catalog( 450 self, 451 streams: Literal["*"] | list[str] | None = None, 452 ) -> ConfiguredAirbyteCatalog: 453 """Get a configured catalog for the given streams. 454 455 If no streams are provided, the selected streams will be used. If no streams are selected, 456 all available streams will be used. 457 458 If '*' is provided, all available streams will be used. 459 """ 460 selected_streams: list[str] = [] 461 if streams is None: 462 selected_streams = self._selected_stream_names or self.get_available_streams() 463 elif streams == "*": 464 selected_streams = self.get_available_streams() 465 elif isinstance(streams, list): 466 selected_streams = streams 467 else: 468 raise exc.PyAirbyteInputError( 469 message="Invalid streams argument.", 470 input_value=streams, 471 ) 472 473 return ConfiguredAirbyteCatalog( 474 streams=[ 475 ConfiguredAirbyteStream( 476 stream=stream, 477 destination_sync_mode=DestinationSyncMode.overwrite, 478 sync_mode=SyncMode.incremental, 479 primary_key=( 480 [self._primary_key_overrides[stream.name.lower()]] 481 if stream.name.lower() in self._primary_key_overrides 482 else stream.source_defined_primary_key 483 ), 484 cursor_field=( 485 [self._cursor_key_overrides[stream.name.lower()]] 486 if stream.name.lower() in self._cursor_key_overrides 487 else stream.default_cursor_field 488 ), 489 # These are unused in the current implementation: 490 generation_id=None, 491 minimum_generation_id=None, 492 sync_id=None, 493 ) 494 for stream in self.discovered_catalog.streams 495 if stream.name in selected_streams 496 ], 497 )
Get a configured catalog for the given streams.
If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.
If '*' is provided, all available streams will be used.
499 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 500 """Return the JSON Schema spec for the specified stream name.""" 501 catalog: AirbyteCatalog = self.discovered_catalog 502 found: list[AirbyteStream] = [ 503 stream for stream in catalog.streams if stream.name == stream_name 504 ] 505 506 if len(found) == 0: 507 raise exc.PyAirbyteInputError( 508 message="Stream name does not exist in catalog.", 509 input_value=stream_name, 510 ) 511 512 if len(found) > 1: 513 raise exc.PyAirbyteInternalError( 514 message="Duplicate streams found with the same name.", 515 context={ 516 "found_streams": found, 517 }, 518 ) 519 520 return found[0].json_schema
Return the JSON Schema spec for the specified stream name.
522 def get_records( 523 self, 524 stream: str, 525 *, 526 normalize_field_names: bool = False, 527 prune_undeclared_fields: bool = True, 528 ) -> LazyDataset: 529 """Read a stream from the connector. 530 531 Args: 532 stream: The name of the stream to read. 533 normalize_field_names: When `True`, field names will be normalized to lower case, with 534 special characters removed. This matches the behavior of PyAirbyte caches and most 535 Airbyte destinations. 536 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 537 which generally matches the behavior of PyAirbyte caches and most Airbyte 538 destinations, specifically when you expect the catalog may be stale. You can disable 539 this to keep all fields in the records. 540 541 This involves the following steps: 542 * Call discover to get the catalog 543 * Generate a configured catalog that syncs the given stream in full_refresh mode 544 * Write the configured catalog and the config to a temporary file 545 * execute the connector with read --config <config_file> --catalog <catalog_file> 546 * Listen to the messages and return the first AirbyteRecordMessages that come along. 547 * Make sure the subprocess is killed when the function returns. 548 """ 549 configured_catalog = self.get_configured_catalog(streams=[stream]) 550 if len(configured_catalog.streams) == 0: 551 raise exc.PyAirbyteInputError( 552 message="Requested stream does not exist.", 553 context={ 554 "stream": stream, 555 "available_streams": self.get_available_streams(), 556 "connector_name": self.name, 557 }, 558 ) from KeyError(stream) 559 560 configured_stream = configured_catalog.streams[0] 561 562 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 563 yield from records 564 565 stream_record_handler = StreamRecordHandler( 566 json_schema=self.get_stream_json_schema(stream), 567 prune_extra_fields=prune_undeclared_fields, 568 normalize_keys=normalize_field_names, 569 ) 570 571 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 572 progress_tracker = ProgressTracker( 573 ProgressStyle.PLAIN, 574 source=self, 575 cache=None, 576 destination=None, 577 expected_streams=[stream], 578 ) 579 580 iterator: Iterator[dict[str, Any]] = ( 581 StreamRecord.from_record_message( 582 record_message=record.record, 583 stream_record_handler=stream_record_handler, 584 ) 585 for record in self._read_with_catalog( 586 catalog=configured_catalog, 587 progress_tracker=progress_tracker, 588 ) 589 if record.record 590 ) 591 progress_tracker.log_success() 592 return LazyDataset( 593 iterator, 594 stream_metadata=configured_stream, 595 )
Read a stream from the connector.
Arguments:
- stream: The name of the stream to read.
- normalize_field_names: When
True
, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations. - prune_undeclared_fields: When
True
, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.
This involves the following steps:
- Call discover to get the catalog
- Generate a configured catalog that syncs the given stream in full_refresh mode
- Write the configured catalog and the config to a temporary file
- execute the connector with read --config
--catalog - Listen to the messages and return the first AirbyteRecordMessages that come along.
- Make sure the subprocess is killed when the function returns.
597 def get_documents( 598 self, 599 stream: str, 600 title_property: str | None = None, 601 content_properties: list[str] | None = None, 602 metadata_properties: list[str] | None = None, 603 *, 604 render_metadata: bool = False, 605 ) -> Iterable[Document]: 606 """Read a stream from the connector and return the records as documents. 607 608 If metadata_properties is not set, all properties that are not content will be added to 609 the metadata. 610 611 If render_metadata is True, metadata will be rendered in the document, as well as the 612 the main content. 613 """ 614 return self.get_records(stream).to_documents( 615 title_property=title_property, 616 content_properties=content_properties, 617 metadata_properties=metadata_properties, 618 render_metadata=render_metadata, 619 )
Read a stream from the connector and return the records as documents.
If metadata_properties is not set, all properties that are not content will be added to the metadata.
If render_metadata is True, metadata will be rendered in the document, as well as the the main content.
709 def read( 710 self, 711 cache: CacheBase | None = None, 712 *, 713 streams: str | list[str] | None = None, 714 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 715 force_full_refresh: bool = False, 716 skip_validation: bool = False, 717 ) -> ReadResult: 718 """Read from the connector and write to the cache. 719 720 Args: 721 cache: The cache to write to. If not set, a default cache will be used. 722 streams: Optional if already set. A list of stream names to select for reading. If set 723 to "*", all streams will be selected. 724 write_strategy: The strategy to use when writing to the cache. If a string, it must be 725 one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one 726 of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or 727 WriteStrategy.AUTO. 728 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 729 streams will be read in incremental mode if supported by the connector. This option 730 must be True when using the "replace" strategy. 731 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 732 running the connector. This can be helpful in debugging, when you want to send 733 configurations to the connector that otherwise might be rejected by JSON Schema 734 validation rules. 735 """ 736 cache = cache or get_default_cache() 737 progress_tracker = ProgressTracker( 738 source=self, 739 cache=cache, 740 destination=None, 741 expected_streams=None, # Will be set later 742 ) 743 744 # Set up state provider if not in full refresh mode 745 if force_full_refresh: 746 state_provider: StateProviderBase | None = None 747 else: 748 state_provider = cache.get_state_provider( 749 source_name=self._name, 750 ) 751 state_writer = cache.get_state_writer(source_name=self._name) 752 753 if streams: 754 self.select_streams(streams) 755 756 if not self._selected_stream_names: 757 raise exc.PyAirbyteNoStreamsSelectedError( 758 connector_name=self.name, 759 available_streams=self.get_available_streams(), 760 ) 761 762 try: 763 result = self._read_to_cache( 764 cache=cache, 765 catalog_provider=CatalogProvider(self.configured_catalog), 766 stream_names=self._selected_stream_names, 767 state_provider=state_provider, 768 state_writer=state_writer, 769 write_strategy=write_strategy, 770 force_full_refresh=force_full_refresh, 771 skip_validation=skip_validation, 772 progress_tracker=progress_tracker, 773 ) 774 except exc.PyAirbyteInternalError as ex: 775 progress_tracker.log_failure(exception=ex) 776 raise exc.AirbyteConnectorFailedError( 777 connector_name=self.name, 778 log_text=self._last_log_messages, 779 ) from ex 780 except Exception as ex: 781 progress_tracker.log_failure(exception=ex) 782 raise 783 784 progress_tracker.log_success() 785 return result
Read from the connector and write to the cache.
Arguments:
- cache: The cache to write to. If not set, a default cache will be used.
- streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
- write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
- force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
- skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.
Inherited Members
- airbyte._connector_base.ConnectorBase
- config_change_callback
- executor
- name
- config_hash
- validate_config
- connector_version
- check
- install
- uninstall