airbyte.sources.base
Base class implementation for sources.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Base class implementation for sources.""" 3 4from __future__ import annotations 5 6import sys 7import threading 8import time 9import warnings 10from itertools import islice 11from typing import TYPE_CHECKING, Any, Literal 12 13import yaml 14from rich import print # noqa: A004 # Allow shadowing the built-in 15from rich.console import Console 16from rich.markdown import Markdown 17from rich.markup import escape 18from rich.table import Table 19from typing_extensions import override 20 21from airbyte_protocol.models import ( 22 AirbyteCatalog, 23 AirbyteMessage, 24 ConfiguredAirbyteCatalog, 25 ConfiguredAirbyteStream, 26 DestinationSyncMode, 27 SyncMode, 28 Type, 29) 30 31from airbyte import exceptions as exc 32from airbyte._connector_base import ConnectorBase 33from airbyte._message_iterators import AirbyteMessageIterator 34from airbyte._util.temp_files import as_temp_files 35from airbyte.caches.util import get_default_cache 36from airbyte.datasets._lazy import LazyDataset 37from airbyte.progress import ProgressStyle, ProgressTracker 38from airbyte.records import StreamRecord, StreamRecordHandler 39from airbyte.results import ReadResult 40from airbyte.shared.catalog_providers import CatalogProvider 41from airbyte.strategies import WriteStrategy 42 43 44if TYPE_CHECKING: 45 from collections.abc import Generator, Iterable, Iterator 46 47 from airbyte_protocol.models import ( 48 AirbyteStream, 49 ConnectorSpecification, 50 ) 51 52 from airbyte._executors.base import Executor 53 from airbyte.caches import CacheBase 54 from airbyte.callbacks import ConfigChangeCallback 55 from airbyte.datasets._inmemory import InMemoryDataset 56 from airbyte.documents import Document 57 from airbyte.shared.state_providers import StateProviderBase 58 from airbyte.shared.state_writers import StateWriterBase 59 60from airbyte.constants import ( 61 AB_EXTRACTED_AT_COLUMN, 62 AB_META_COLUMN, 63 AB_RAW_ID_COLUMN, 64) 65 66 67class Source(ConnectorBase): # noqa: PLR0904 68 """A class representing a source that can be called.""" 69 70 connector_type = "source" 71 72 def __init__( 73 self, 74 executor: Executor, 75 name: str, 76 config: dict[str, Any] | None = None, 77 *, 78 config_change_callback: ConfigChangeCallback | None = None, 79 streams: str | list[str] | None = None, 80 validate: bool = False, 81 cursor_key_overrides: dict[str, str] | None = None, 82 primary_key_overrides: dict[str, str | list[str]] | None = None, 83 ) -> None: 84 """Initialize the source. 85 86 If config is provided, it will be validated against the spec if validate is True. 87 """ 88 self._to_be_selected_streams: list[str] | str = [] 89 """Used to hold selection criteria before catalog is known.""" 90 91 super().__init__( 92 executor=executor, 93 name=name, 94 config=config, 95 config_change_callback=config_change_callback, 96 validate=validate, 97 ) 98 self._config_dict: dict[str, Any] | None = None 99 self._last_log_messages: list[str] = [] 100 self._discovered_catalog: AirbyteCatalog | None = None 101 self._selected_stream_names: list[str] = [] 102 103 self._cursor_key_overrides: dict[str, str] = {} 104 """A mapping of lower-cased stream names to cursor key overrides.""" 105 106 self._primary_key_overrides: dict[str, list[str]] = {} 107 """A mapping of lower-cased stream names to primary key overrides.""" 108 109 if config is not None: 110 self.set_config(config, validate=validate) 111 if streams is not None: 112 self.select_streams(streams) 113 if cursor_key_overrides is not None: 114 self.set_cursor_keys(**cursor_key_overrides) 115 if primary_key_overrides is not None: 116 self.set_primary_keys(**primary_key_overrides) 117 118 def set_streams(self, streams: list[str]) -> None: 119 """Deprecated. See select_streams().""" 120 warnings.warn( 121 "The 'set_streams' method is deprecated and will be removed in a future version. " 122 "Please use the 'select_streams' method instead.", 123 DeprecationWarning, 124 stacklevel=2, 125 ) 126 self.select_streams(streams) 127 128 def set_cursor_key( 129 self, 130 stream_name: str, 131 cursor_key: str, 132 ) -> None: 133 """Set the cursor for a single stream. 134 135 Note: 136 - This does not unset previously set cursors. 137 - The cursor key must be a single field name. 138 - Not all streams support custom cursors. If a stream does not support custom cursors, 139 the override may be ignored. 140 - Stream names are case insensitive, while field names are case sensitive. 141 - Stream names are not validated by PyAirbyte. If the stream name 142 does not exist in the catalog, the override may be ignored. 143 """ 144 self._cursor_key_overrides[stream_name.lower()] = cursor_key 145 146 def set_cursor_keys( 147 self, 148 **kwargs: str, 149 ) -> None: 150 """Override the cursor key for one or more streams. 151 152 Usage: 153 source.set_cursor_keys( 154 stream1="cursor1", 155 stream2="cursor2", 156 ) 157 158 Note: 159 - This does not unset previously set cursors. 160 - The cursor key must be a single field name. 161 - Not all streams support custom cursors. If a stream does not support custom cursors, 162 the override may be ignored. 163 - Stream names are case insensitive, while field names are case sensitive. 164 - Stream names are not validated by PyAirbyte. If the stream name 165 does not exist in the catalog, the override may be ignored. 166 """ 167 self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()}) 168 169 def set_primary_key( 170 self, 171 stream_name: str, 172 primary_key: str | list[str], 173 ) -> None: 174 """Set the primary key for a single stream. 175 176 Note: 177 - This does not unset previously set primary keys. 178 - The primary key must be a single field name or a list of field names. 179 - Not all streams support overriding primary keys. If a stream does not support overriding 180 primary keys, the override may be ignored. 181 - Stream names are case insensitive, while field names are case sensitive. 182 - Stream names are not validated by PyAirbyte. If the stream name 183 does not exist in the catalog, the override may be ignored. 184 """ 185 self._primary_key_overrides[stream_name.lower()] = ( 186 primary_key if isinstance(primary_key, list) else [primary_key] 187 ) 188 189 def set_primary_keys( 190 self, 191 **kwargs: str | list[str], 192 ) -> None: 193 """Override the primary keys for one or more streams. 194 195 This does not unset previously set primary keys. 196 197 Usage: 198 source.set_primary_keys( 199 stream1="pk1", 200 stream2=["pk1", "pk2"], 201 ) 202 203 Note: 204 - This does not unset previously set primary keys. 205 - The primary key must be a single field name or a list of field names. 206 - Not all streams support overriding primary keys. If a stream does not support overriding 207 primary keys, the override may be ignored. 208 - Stream names are case insensitive, while field names are case sensitive. 209 - Stream names are not validated by PyAirbyte. If the stream name 210 does not exist in the catalog, the override may be ignored. 211 """ 212 self._primary_key_overrides.update( 213 {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()} 214 ) 215 216 def _log_warning_preselected_stream(self, streams: str | list[str]) -> None: 217 """Logs a warning message indicating stream selection which are not selected yet.""" 218 if streams == "*": 219 print( 220 "Warning: Config is not set yet. All streams will be selected after config is set.", 221 file=sys.stderr, 222 ) 223 else: 224 print( 225 "Warning: Config is not set yet. " 226 f"Streams to be selected after config is set: {streams}", 227 file=sys.stderr, 228 ) 229 230 def select_all_streams(self) -> None: 231 """Select all streams. 232 233 This is a more streamlined equivalent to: 234 > source.select_streams(source.get_available_streams()). 235 """ 236 if self._config_dict is None: 237 self._to_be_selected_streams = "*" 238 self._log_warning_preselected_stream(self._to_be_selected_streams) 239 return 240 241 self._selected_stream_names = self.get_available_streams() 242 243 def select_streams(self, streams: str | list[str]) -> None: 244 """Select the stream names that should be read from the connector. 245 246 Args: 247 streams: A list of stream names to select. If set to "*", all streams will be selected. 248 249 Currently, if this is not set, all streams will be read. 250 """ 251 if self._config_dict is None: 252 self._to_be_selected_streams = streams 253 self._log_warning_preselected_stream(streams) 254 return 255 256 if streams == "*": 257 self.select_all_streams() 258 return 259 260 if isinstance(streams, str): 261 # If a single stream is provided, convert it to a one-item list 262 streams = [streams] 263 264 available_streams = self.get_available_streams() 265 for stream in streams: 266 if stream not in available_streams: 267 raise exc.AirbyteStreamNotFoundError( 268 stream_name=stream, 269 connector_name=self.name, 270 available_streams=available_streams, 271 ) 272 self._selected_stream_names = streams 273 274 def get_selected_streams(self) -> list[str]: 275 """Get the selected streams. 276 277 If no streams are selected, return an empty list. 278 """ 279 return self._selected_stream_names 280 281 def set_config( 282 self, 283 config: dict[str, Any], 284 *, 285 validate: bool = True, 286 ) -> None: 287 """Set the config for the connector. 288 289 If validate is True, raise an exception if the config fails validation. 290 291 If validate is False, validation will be deferred until check() or validate_config() 292 is called. 293 """ 294 if validate: 295 self.validate_config(config) 296 297 self._config_dict = config 298 299 if self._to_be_selected_streams: 300 self.select_streams(self._to_be_selected_streams) 301 self._to_be_selected_streams = [] 302 303 def _discover(self) -> AirbyteCatalog: 304 """Call discover on the connector. 305 306 This involves the following steps: 307 - Write the config to a temporary file 308 - execute the connector with discover --config <config_file> 309 - Listen to the messages and return the first AirbyteCatalog that comes along. 310 - Make sure the subprocess is killed when the function returns. 311 """ 312 with as_temp_files([self._hydrated_config]) as [config_file]: 313 for msg in self._execute(["discover", "--config", config_file]): 314 if msg.type == Type.CATALOG and msg.catalog: 315 return msg.catalog 316 raise exc.AirbyteConnectorMissingCatalogError( 317 connector_name=self.name, 318 log_text=self._last_log_messages, 319 ) 320 321 def get_available_streams(self) -> list[str]: 322 """Get the available streams from the spec.""" 323 return [s.name for s in self.discovered_catalog.streams] 324 325 def _get_incremental_stream_names(self) -> list[str]: 326 """Get the name of streams that support incremental sync.""" 327 return [ 328 stream.name 329 for stream in self.discovered_catalog.streams 330 if SyncMode.incremental in stream.supported_sync_modes 331 ] 332 333 @override 334 def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: 335 """Call spec on the connector. 336 337 This involves the following steps: 338 * execute the connector with spec 339 * Listen to the messages and return the first AirbyteCatalog that comes along. 340 * Make sure the subprocess is killed when the function returns. 341 """ 342 if force_refresh or self._spec is None: 343 for msg in self._execute(["spec"]): 344 if msg.type == Type.SPEC and msg.spec: 345 self._spec = msg.spec 346 break 347 348 if self._spec: 349 return self._spec 350 351 raise exc.AirbyteConnectorMissingSpecError( 352 connector_name=self.name, 353 log_text=self._last_log_messages, 354 ) 355 356 @property 357 def config_spec(self) -> dict[str, Any]: 358 """Generate a configuration spec for this connector, as a JSON Schema definition. 359 360 This function generates a JSON Schema dictionary with configuration specs for the 361 current connector, as a dictionary. 362 363 Returns: 364 dict: The JSON Schema configuration spec as a dictionary. 365 """ 366 return self._get_spec(force_refresh=True).connectionSpecification 367 368 @property 369 def _yaml_spec(self) -> str: 370 """Get the spec as a yaml string. 371 372 For now, the primary use case is for writing and debugging a valid config for a source. 373 374 This is private for now because we probably want better polish before exposing this 375 as a stable interface. This will also get easier when we have docs links with this info 376 for each connector. 377 """ 378 spec_obj: ConnectorSpecification = self._get_spec() 379 spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True) 380 # convert to a yaml string 381 return yaml.dump(spec_dict) 382 383 @property 384 def docs_url(self) -> str: 385 """Get the URL to the connector's documentation.""" 386 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 387 "source-", "" 388 ) 389 390 @property 391 def discovered_catalog(self) -> AirbyteCatalog: 392 """Get the raw catalog for the given streams. 393 394 If the catalog is not yet known, we call discover to get it. 395 """ 396 if self._discovered_catalog is None: 397 self._discovered_catalog = self._discover() 398 399 return self._discovered_catalog 400 401 @property 402 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 403 """Get the configured catalog for the given streams. 404 405 If the raw catalog is not yet known, we call discover to get it. 406 407 If no specific streams are selected, we return a catalog that syncs all available streams. 408 409 TODO: We should consider disabling by default the streams that the connector would 410 disable by default. (For instance, streams that require a premium license are sometimes 411 disabled by default within the connector.) 412 """ 413 # Ensure discovered catalog is cached before we start 414 _ = self.discovered_catalog 415 416 # Filter for selected streams if set, otherwise use all available streams: 417 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 418 return self.get_configured_catalog(streams=streams_filter) 419 420 def get_configured_catalog( 421 self, 422 streams: Literal["*"] | list[str] | None = None, 423 *, 424 force_full_refresh: bool = False, 425 ) -> ConfiguredAirbyteCatalog: 426 """Get a configured catalog for the given streams. 427 428 If no streams are provided, the selected streams will be used. If no streams are selected, 429 all available streams will be used. 430 431 If '*' is provided, all available streams will be used. 432 433 If force_full_refresh is True, streams will be configured with full_refresh sync mode 434 when supported by the stream. Otherwise, incremental sync mode is used when supported. 435 """ 436 selected_streams: list[str] = [] 437 if streams is None: 438 selected_streams = self._selected_stream_names or self.get_available_streams() 439 elif streams == "*": 440 selected_streams = self.get_available_streams() 441 elif isinstance(streams, list): 442 selected_streams = streams 443 else: 444 raise exc.PyAirbyteInputError( 445 message="Invalid streams argument.", 446 input_value=streams, 447 ) 448 449 def _get_sync_mode(stream: AirbyteStream) -> SyncMode: 450 """Determine the sync mode for a stream based on force_full_refresh and support.""" 451 # Use getattr to handle mocks or streams without supported_sync_modes attribute 452 supported_modes = getattr(stream, "supported_sync_modes", None) 453 454 if force_full_refresh: 455 # When force_full_refresh is True, prefer full_refresh if supported 456 if supported_modes and SyncMode.full_refresh in supported_modes: 457 return SyncMode.full_refresh 458 # Fall back to incremental if full_refresh is not supported 459 return SyncMode.incremental 460 461 # Default behavior: preserve previous semantics (always incremental) 462 return SyncMode.incremental 463 464 return ConfiguredAirbyteCatalog( 465 streams=[ 466 ConfiguredAirbyteStream( 467 stream=stream, 468 destination_sync_mode=DestinationSyncMode.overwrite, 469 sync_mode=_get_sync_mode(stream), 470 primary_key=( 471 [self._primary_key_overrides[stream.name.lower()]] 472 if stream.name.lower() in self._primary_key_overrides 473 else stream.source_defined_primary_key 474 ), 475 cursor_field=( 476 [self._cursor_key_overrides[stream.name.lower()]] 477 if stream.name.lower() in self._cursor_key_overrides 478 else stream.default_cursor_field 479 ), 480 # These are unused in the current implementation: 481 generation_id=None, 482 minimum_generation_id=None, 483 sync_id=None, 484 ) 485 for stream in self.discovered_catalog.streams 486 if stream.name in selected_streams 487 ], 488 ) 489 490 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 491 """Return the JSON Schema spec for the specified stream name.""" 492 catalog: AirbyteCatalog = self.discovered_catalog 493 found: list[AirbyteStream] = [ 494 stream for stream in catalog.streams if stream.name == stream_name 495 ] 496 497 if len(found) == 0: 498 raise exc.PyAirbyteInputError( 499 message="Stream name does not exist in catalog.", 500 input_value=stream_name, 501 ) 502 503 if len(found) > 1: 504 raise exc.PyAirbyteInternalError( 505 message="Duplicate streams found with the same name.", 506 context={ 507 "found_streams": found, 508 }, 509 ) 510 511 return found[0].json_schema 512 513 def get_records( 514 self, 515 stream: str, 516 *, 517 limit: int | None = None, 518 stop_event: threading.Event | None = None, 519 normalize_field_names: bool = False, 520 prune_undeclared_fields: bool = True, 521 ) -> LazyDataset: 522 """Read a stream from the connector. 523 524 Args: 525 stream: The name of the stream to read. 526 limit: The maximum number of records to read. If None, all records will be read. 527 stop_event: If set, the event can be triggered by the caller to stop reading records 528 and terminate the process. 529 normalize_field_names: When `True`, field names will be normalized to lower case, with 530 special characters removed. This matches the behavior of PyAirbyte caches and most 531 Airbyte destinations. 532 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 533 which generally matches the behavior of PyAirbyte caches and most Airbyte 534 destinations, specifically when you expect the catalog may be stale. You can disable 535 this to keep all fields in the records. 536 537 This involves the following steps: 538 * Call discover to get the catalog 539 * Generate a configured catalog that syncs the given stream in full_refresh mode 540 * Write the configured catalog and the config to a temporary file 541 * execute the connector with read --config <config_file> --catalog <catalog_file> 542 * Listen to the messages and return the first AirbyteRecordMessages that come along. 543 * Make sure the subprocess is killed when the function returns. 544 """ 545 stop_event = stop_event or threading.Event() 546 configured_catalog = self.get_configured_catalog(streams=[stream]) 547 if len(configured_catalog.streams) == 0: 548 raise exc.PyAirbyteInputError( 549 message="Requested stream does not exist.", 550 context={ 551 "stream": stream, 552 "available_streams": self.get_available_streams(), 553 "connector_name": self.name, 554 }, 555 ) from KeyError(stream) 556 557 configured_stream = configured_catalog.streams[0] 558 559 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 560 yield from records 561 562 stream_record_handler = StreamRecordHandler( 563 json_schema=self.get_stream_json_schema(stream), 564 prune_extra_fields=prune_undeclared_fields, 565 normalize_keys=normalize_field_names, 566 ) 567 568 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 569 progress_tracker = ProgressTracker( 570 ProgressStyle.PLAIN, 571 source=self, 572 cache=None, 573 destination=None, 574 expected_streams=[stream], 575 ) 576 577 iterator: Iterator[dict[str, Any]] = ( 578 StreamRecord.from_record_message( 579 record_message=record.record, 580 stream_record_handler=stream_record_handler, 581 ) 582 for record in self._read_with_catalog( 583 catalog=configured_catalog, 584 progress_tracker=progress_tracker, 585 stop_event=stop_event, 586 ) 587 if record.record 588 ) 589 if limit is not None: 590 # Stop the iterator after the limit is reached 591 iterator = islice(iterator, limit) 592 593 return LazyDataset( 594 iterator, 595 stream_metadata=configured_stream, 596 stop_event=stop_event, 597 progress_tracker=progress_tracker, 598 ) 599 600 def get_documents( 601 self, 602 stream: str, 603 title_property: str | None = None, 604 content_properties: list[str] | None = None, 605 metadata_properties: list[str] | None = None, 606 *, 607 render_metadata: bool = False, 608 ) -> Iterable[Document]: 609 """Read a stream from the connector and return the records as documents. 610 611 If metadata_properties is not set, all properties that are not content will be added to 612 the metadata. 613 614 If render_metadata is True, metadata will be rendered in the document, as well as the 615 the main content. 616 """ 617 return self.get_records(stream).to_documents( 618 title_property=title_property, 619 content_properties=content_properties, 620 metadata_properties=metadata_properties, 621 render_metadata=render_metadata, 622 ) 623 624 def get_samples( 625 self, 626 streams: list[str] | Literal["*"] | None = None, 627 *, 628 limit: int = 5, 629 on_error: Literal["raise", "ignore", "log"] = "raise", 630 ) -> dict[str, InMemoryDataset | None]: 631 """Get a sample of records from the given streams.""" 632 if streams == "*": 633 streams = self.get_available_streams() 634 elif streams is None: 635 streams = self.get_selected_streams() 636 637 results: dict[str, InMemoryDataset | None] = {} 638 for stream in streams: 639 stop_event = threading.Event() 640 try: 641 results[stream] = self.get_records( 642 stream, 643 limit=limit, 644 stop_event=stop_event, 645 ).fetch_all() 646 stop_event.set() 647 except Exception as ex: 648 results[stream] = None 649 if on_error == "ignore": 650 continue 651 652 if on_error == "raise": 653 raise ex from None 654 655 if on_error == "log": 656 print(f"Error fetching sample for stream '{stream}': {ex}") 657 658 return results 659 660 def print_samples( 661 self, 662 streams: list[str] | Literal["*"] | None = None, 663 *, 664 limit: int = 5, 665 on_error: Literal["raise", "ignore", "log"] = "log", 666 ) -> None: 667 """Print a sample of records from the given streams.""" 668 internal_cols: list[str] = [ 669 AB_EXTRACTED_AT_COLUMN, 670 AB_META_COLUMN, 671 AB_RAW_ID_COLUMN, 672 ] 673 col_limit = 10 674 if streams == "*": 675 streams = self.get_available_streams() 676 elif streams is None: 677 streams = self.get_selected_streams() 678 679 console = Console() 680 681 console.print( 682 Markdown( 683 f"# Sample Records from `{self.name}` ({len(streams)} selected streams)", 684 justify="left", 685 ) 686 ) 687 688 for stream in streams: 689 console.print(Markdown(f"## `{stream}` Stream Sample", justify="left")) 690 samples = self.get_samples( 691 streams=[stream], 692 limit=limit, 693 on_error=on_error, 694 ) 695 dataset = samples[stream] 696 697 table = Table( 698 show_header=True, 699 show_lines=True, 700 ) 701 if dataset is None: 702 console.print( 703 Markdown("**⚠️ `Error fetching sample records.` ⚠️**"), 704 ) 705 continue 706 707 if len(dataset.column_names) > col_limit: 708 # We'll pivot the columns so each column is its own row 709 table.add_column("Column Name") 710 for _ in range(len(dataset)): 711 table.add_column(overflow="fold") 712 for col in dataset.column_names: 713 table.add_row( 714 Markdown(f"**`{col}`**"), 715 *[escape(str(record[col])) for record in dataset], 716 ) 717 else: 718 for col in dataset.column_names: 719 table.add_column( 720 Markdown(f"**`{col}`**"), 721 overflow="fold", 722 ) 723 724 for record in dataset: 725 table.add_row( 726 *[ 727 escape(str(val)) 728 for key, val in record.items() 729 # Exclude internal Airbyte columns. 730 if key not in internal_cols 731 ] 732 ) 733 734 console.print(table) 735 736 console.print(Markdown("--------------")) 737 738 def _get_airbyte_message_iterator( 739 self, 740 *, 741 streams: Literal["*"] | list[str] | None = None, 742 state_provider: StateProviderBase | None = None, 743 progress_tracker: ProgressTracker, 744 force_full_refresh: bool = False, 745 ) -> AirbyteMessageIterator: 746 """Get an AirbyteMessageIterator for this source.""" 747 return AirbyteMessageIterator( 748 self._read_with_catalog( 749 catalog=self.get_configured_catalog( 750 streams=streams, 751 force_full_refresh=force_full_refresh, 752 ), 753 state=state_provider if not force_full_refresh else None, 754 progress_tracker=progress_tracker, 755 ) 756 ) 757 758 def _read_with_catalog( 759 self, 760 catalog: ConfiguredAirbyteCatalog, 761 progress_tracker: ProgressTracker, 762 *, 763 state: StateProviderBase | None = None, 764 stop_event: threading.Event | None = None, 765 ) -> Generator[AirbyteMessage, None, None]: 766 """Call read on the connector. 767 768 This involves the following steps: 769 * Write the config to a temporary file 770 * execute the connector with read --config <config_file> --catalog <catalog_file> 771 * Listen to the messages and return the AirbyteRecordMessages that come along. 772 * Send out telemetry on the performed sync (with information about which source was used and 773 the type of the cache) 774 """ 775 with as_temp_files( 776 [ 777 self._hydrated_config, 778 catalog.model_dump_json(exclude_none=True), 779 state.to_state_input_file_text() if state else "[]", 780 ] 781 ) as [ 782 config_file, 783 catalog_file, 784 state_file, 785 ]: 786 message_generator = self._execute( 787 [ 788 "read", 789 "--config", 790 config_file, 791 "--catalog", 792 catalog_file, 793 "--state", 794 state_file, 795 ], 796 progress_tracker=progress_tracker, 797 ) 798 for message in progress_tracker.tally_records_read(message_generator): 799 if stop_event and stop_event.is_set(): 800 progress_tracker._log_sync_cancel() # noqa: SLF001 801 time.sleep(0.1) 802 return 803 804 yield message 805 806 progress_tracker.log_read_complete() 807 808 def _peek_airbyte_message( 809 self, 810 message: AirbyteMessage, 811 *, 812 raise_on_error: bool = True, 813 ) -> None: 814 """Process an Airbyte message. 815 816 This method handles reading Airbyte messages and taking action, if needed, based on the 817 message type. For instance, log messages are logged, records are tallied, and errors are 818 raised as exceptions if `raise_on_error` is True. 819 820 Raises: 821 AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted. 822 """ 823 super()._peek_airbyte_message(message, raise_on_error=raise_on_error) 824 825 def _log_incremental_streams( 826 self, 827 *, 828 incremental_streams: set[str] | None = None, 829 ) -> None: 830 """Log the streams which are using incremental sync mode.""" 831 log_message = ( 832 "The following streams are currently using incremental sync:\n" 833 f"{incremental_streams}\n" 834 "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method." 835 ) 836 print(log_message, file=sys.stderr) 837 838 def read( 839 self, 840 cache: CacheBase | None = None, 841 *, 842 streams: str | list[str] | None = None, 843 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 844 force_full_refresh: bool = False, 845 skip_validation: bool = False, 846 ) -> ReadResult: 847 """Read from the connector and write to the cache. 848 849 Args: 850 cache: The cache to write to. If not set, a default cache will be used. 851 streams: Optional if already set. A list of stream names to select for reading. If set 852 to "*", all streams will be selected. 853 write_strategy: The strategy to use when writing to the cache. If a string, it must be 854 one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one 855 of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or 856 WriteStrategy.AUTO. 857 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 858 streams will be read in incremental mode if supported by the connector. This option 859 must be True when using the "replace" strategy. 860 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 861 running the connector. This can be helpful in debugging, when you want to send 862 configurations to the connector that otherwise might be rejected by JSON Schema 863 validation rules. 864 """ 865 cache = cache or get_default_cache() 866 progress_tracker = ProgressTracker( 867 source=self, 868 cache=cache, 869 destination=None, 870 expected_streams=None, # Will be set later 871 ) 872 873 # Set up state provider if not in full refresh mode 874 if force_full_refresh: 875 state_provider: StateProviderBase | None = None 876 else: 877 state_provider = cache.get_state_provider( 878 source_name=self._name, 879 ) 880 state_writer = cache.get_state_writer(source_name=self._name) 881 882 if streams: 883 self.select_streams(streams) 884 885 if not self._selected_stream_names: 886 raise exc.PyAirbyteNoStreamsSelectedError( 887 connector_name=self.name, 888 available_streams=self.get_available_streams(), 889 ) 890 891 try: 892 result = self._read_to_cache( 893 cache=cache, 894 catalog_provider=CatalogProvider( 895 self.get_configured_catalog(force_full_refresh=force_full_refresh) 896 ), 897 stream_names=self._selected_stream_names, 898 state_provider=state_provider, 899 state_writer=state_writer, 900 write_strategy=write_strategy, 901 force_full_refresh=force_full_refresh, 902 skip_validation=skip_validation, 903 progress_tracker=progress_tracker, 904 ) 905 except exc.PyAirbyteInternalError as ex: 906 progress_tracker.log_failure(exception=ex) 907 raise exc.AirbyteConnectorFailedError( 908 connector_name=self.name, 909 log_text=self._last_log_messages, 910 ) from ex 911 except Exception as ex: 912 progress_tracker.log_failure(exception=ex) 913 raise 914 915 progress_tracker.log_success() 916 return result 917 918 def _read_to_cache( # noqa: PLR0913 # Too many arguments 919 self, 920 cache: CacheBase, 921 *, 922 catalog_provider: CatalogProvider, 923 stream_names: list[str], 924 state_provider: StateProviderBase | None, 925 state_writer: StateWriterBase | None, 926 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 927 force_full_refresh: bool = False, 928 skip_validation: bool = False, 929 progress_tracker: ProgressTracker, 930 ) -> ReadResult: 931 """Internal read method.""" 932 if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: 933 warnings.warn( 934 message=( 935 "Using `REPLACE` strategy without also setting `force_full_refresh=True` " 936 "could result in data loss. " 937 "To silence this warning, use the following: " 938 'warnings.filterwarnings("ignore", ' 939 'category="airbyte.warnings.PyAirbyteDataLossWarning")`' 940 ), 941 category=exc.PyAirbyteDataLossWarning, 942 stacklevel=1, 943 ) 944 if isinstance(write_strategy, str): 945 try: 946 write_strategy = WriteStrategy(write_strategy) 947 except ValueError: 948 raise exc.PyAirbyteInputError( 949 message="Invalid strategy", 950 context={ 951 "write_strategy": write_strategy, 952 "available_strategies": [ 953 s.value 954 for s in WriteStrategy # pyrefly: ignore[not-iterable] 955 ], 956 }, 957 ) from None 958 959 # Run optional validation step 960 if not skip_validation: 961 self.validate_config() 962 963 # Log incremental stream if incremental streams are known 964 if state_provider and state_provider.known_stream_names: 965 # Retrieve set of the known streams support which support incremental sync 966 incremental_streams = ( 967 set(self._get_incremental_stream_names()) 968 & state_provider.known_stream_names 969 & set(self.get_selected_streams()) 970 ) 971 if incremental_streams: 972 self._log_incremental_streams(incremental_streams=incremental_streams) 973 974 airbyte_message_iterator = AirbyteMessageIterator( 975 self._read_with_catalog( 976 catalog=catalog_provider.configured_catalog, 977 state=state_provider, 978 progress_tracker=progress_tracker, 979 ) 980 ) 981 cache._write_airbyte_message_stream( # noqa: SLF001 # Non-public API 982 stdin=airbyte_message_iterator, 983 catalog_provider=catalog_provider, 984 write_strategy=write_strategy, 985 state_writer=state_writer, 986 progress_tracker=progress_tracker, 987 ) 988 989 # Flush the WAL, if applicable 990 cache.processor._do_checkpoint() # noqa: SLF001 # Non-public API 991 992 return ReadResult( 993 source_name=self.name, 994 progress_tracker=progress_tracker, 995 processed_streams=stream_names, 996 cache=cache, 997 ) 998 999 1000__all__ = [ 1001 "Source", 1002]
68class Source(ConnectorBase): # noqa: PLR0904 69 """A class representing a source that can be called.""" 70 71 connector_type = "source" 72 73 def __init__( 74 self, 75 executor: Executor, 76 name: str, 77 config: dict[str, Any] | None = None, 78 *, 79 config_change_callback: ConfigChangeCallback | None = None, 80 streams: str | list[str] | None = None, 81 validate: bool = False, 82 cursor_key_overrides: dict[str, str] | None = None, 83 primary_key_overrides: dict[str, str | list[str]] | None = None, 84 ) -> None: 85 """Initialize the source. 86 87 If config is provided, it will be validated against the spec if validate is True. 88 """ 89 self._to_be_selected_streams: list[str] | str = [] 90 """Used to hold selection criteria before catalog is known.""" 91 92 super().__init__( 93 executor=executor, 94 name=name, 95 config=config, 96 config_change_callback=config_change_callback, 97 validate=validate, 98 ) 99 self._config_dict: dict[str, Any] | None = None 100 self._last_log_messages: list[str] = [] 101 self._discovered_catalog: AirbyteCatalog | None = None 102 self._selected_stream_names: list[str] = [] 103 104 self._cursor_key_overrides: dict[str, str] = {} 105 """A mapping of lower-cased stream names to cursor key overrides.""" 106 107 self._primary_key_overrides: dict[str, list[str]] = {} 108 """A mapping of lower-cased stream names to primary key overrides.""" 109 110 if config is not None: 111 self.set_config(config, validate=validate) 112 if streams is not None: 113 self.select_streams(streams) 114 if cursor_key_overrides is not None: 115 self.set_cursor_keys(**cursor_key_overrides) 116 if primary_key_overrides is not None: 117 self.set_primary_keys(**primary_key_overrides) 118 119 def set_streams(self, streams: list[str]) -> None: 120 """Deprecated. See select_streams().""" 121 warnings.warn( 122 "The 'set_streams' method is deprecated and will be removed in a future version. " 123 "Please use the 'select_streams' method instead.", 124 DeprecationWarning, 125 stacklevel=2, 126 ) 127 self.select_streams(streams) 128 129 def set_cursor_key( 130 self, 131 stream_name: str, 132 cursor_key: str, 133 ) -> None: 134 """Set the cursor for a single stream. 135 136 Note: 137 - This does not unset previously set cursors. 138 - The cursor key must be a single field name. 139 - Not all streams support custom cursors. If a stream does not support custom cursors, 140 the override may be ignored. 141 - Stream names are case insensitive, while field names are case sensitive. 142 - Stream names are not validated by PyAirbyte. If the stream name 143 does not exist in the catalog, the override may be ignored. 144 """ 145 self._cursor_key_overrides[stream_name.lower()] = cursor_key 146 147 def set_cursor_keys( 148 self, 149 **kwargs: str, 150 ) -> None: 151 """Override the cursor key for one or more streams. 152 153 Usage: 154 source.set_cursor_keys( 155 stream1="cursor1", 156 stream2="cursor2", 157 ) 158 159 Note: 160 - This does not unset previously set cursors. 161 - The cursor key must be a single field name. 162 - Not all streams support custom cursors. If a stream does not support custom cursors, 163 the override may be ignored. 164 - Stream names are case insensitive, while field names are case sensitive. 165 - Stream names are not validated by PyAirbyte. If the stream name 166 does not exist in the catalog, the override may be ignored. 167 """ 168 self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()}) 169 170 def set_primary_key( 171 self, 172 stream_name: str, 173 primary_key: str | list[str], 174 ) -> None: 175 """Set the primary key for a single stream. 176 177 Note: 178 - This does not unset previously set primary keys. 179 - The primary key must be a single field name or a list of field names. 180 - Not all streams support overriding primary keys. If a stream does not support overriding 181 primary keys, the override may be ignored. 182 - Stream names are case insensitive, while field names are case sensitive. 183 - Stream names are not validated by PyAirbyte. If the stream name 184 does not exist in the catalog, the override may be ignored. 185 """ 186 self._primary_key_overrides[stream_name.lower()] = ( 187 primary_key if isinstance(primary_key, list) else [primary_key] 188 ) 189 190 def set_primary_keys( 191 self, 192 **kwargs: str | list[str], 193 ) -> None: 194 """Override the primary keys for one or more streams. 195 196 This does not unset previously set primary keys. 197 198 Usage: 199 source.set_primary_keys( 200 stream1="pk1", 201 stream2=["pk1", "pk2"], 202 ) 203 204 Note: 205 - This does not unset previously set primary keys. 206 - The primary key must be a single field name or a list of field names. 207 - Not all streams support overriding primary keys. If a stream does not support overriding 208 primary keys, the override may be ignored. 209 - Stream names are case insensitive, while field names are case sensitive. 210 - Stream names are not validated by PyAirbyte. If the stream name 211 does not exist in the catalog, the override may be ignored. 212 """ 213 self._primary_key_overrides.update( 214 {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()} 215 ) 216 217 def _log_warning_preselected_stream(self, streams: str | list[str]) -> None: 218 """Logs a warning message indicating stream selection which are not selected yet.""" 219 if streams == "*": 220 print( 221 "Warning: Config is not set yet. All streams will be selected after config is set.", 222 file=sys.stderr, 223 ) 224 else: 225 print( 226 "Warning: Config is not set yet. " 227 f"Streams to be selected after config is set: {streams}", 228 file=sys.stderr, 229 ) 230 231 def select_all_streams(self) -> None: 232 """Select all streams. 233 234 This is a more streamlined equivalent to: 235 > source.select_streams(source.get_available_streams()). 236 """ 237 if self._config_dict is None: 238 self._to_be_selected_streams = "*" 239 self._log_warning_preselected_stream(self._to_be_selected_streams) 240 return 241 242 self._selected_stream_names = self.get_available_streams() 243 244 def select_streams(self, streams: str | list[str]) -> None: 245 """Select the stream names that should be read from the connector. 246 247 Args: 248 streams: A list of stream names to select. If set to "*", all streams will be selected. 249 250 Currently, if this is not set, all streams will be read. 251 """ 252 if self._config_dict is None: 253 self._to_be_selected_streams = streams 254 self._log_warning_preselected_stream(streams) 255 return 256 257 if streams == "*": 258 self.select_all_streams() 259 return 260 261 if isinstance(streams, str): 262 # If a single stream is provided, convert it to a one-item list 263 streams = [streams] 264 265 available_streams = self.get_available_streams() 266 for stream in streams: 267 if stream not in available_streams: 268 raise exc.AirbyteStreamNotFoundError( 269 stream_name=stream, 270 connector_name=self.name, 271 available_streams=available_streams, 272 ) 273 self._selected_stream_names = streams 274 275 def get_selected_streams(self) -> list[str]: 276 """Get the selected streams. 277 278 If no streams are selected, return an empty list. 279 """ 280 return self._selected_stream_names 281 282 def set_config( 283 self, 284 config: dict[str, Any], 285 *, 286 validate: bool = True, 287 ) -> None: 288 """Set the config for the connector. 289 290 If validate is True, raise an exception if the config fails validation. 291 292 If validate is False, validation will be deferred until check() or validate_config() 293 is called. 294 """ 295 if validate: 296 self.validate_config(config) 297 298 self._config_dict = config 299 300 if self._to_be_selected_streams: 301 self.select_streams(self._to_be_selected_streams) 302 self._to_be_selected_streams = [] 303 304 def _discover(self) -> AirbyteCatalog: 305 """Call discover on the connector. 306 307 This involves the following steps: 308 - Write the config to a temporary file 309 - execute the connector with discover --config <config_file> 310 - Listen to the messages and return the first AirbyteCatalog that comes along. 311 - Make sure the subprocess is killed when the function returns. 312 """ 313 with as_temp_files([self._hydrated_config]) as [config_file]: 314 for msg in self._execute(["discover", "--config", config_file]): 315 if msg.type == Type.CATALOG and msg.catalog: 316 return msg.catalog 317 raise exc.AirbyteConnectorMissingCatalogError( 318 connector_name=self.name, 319 log_text=self._last_log_messages, 320 ) 321 322 def get_available_streams(self) -> list[str]: 323 """Get the available streams from the spec.""" 324 return [s.name for s in self.discovered_catalog.streams] 325 326 def _get_incremental_stream_names(self) -> list[str]: 327 """Get the name of streams that support incremental sync.""" 328 return [ 329 stream.name 330 for stream in self.discovered_catalog.streams 331 if SyncMode.incremental in stream.supported_sync_modes 332 ] 333 334 @override 335 def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: 336 """Call spec on the connector. 337 338 This involves the following steps: 339 * execute the connector with spec 340 * Listen to the messages and return the first AirbyteCatalog that comes along. 341 * Make sure the subprocess is killed when the function returns. 342 """ 343 if force_refresh or self._spec is None: 344 for msg in self._execute(["spec"]): 345 if msg.type == Type.SPEC and msg.spec: 346 self._spec = msg.spec 347 break 348 349 if self._spec: 350 return self._spec 351 352 raise exc.AirbyteConnectorMissingSpecError( 353 connector_name=self.name, 354 log_text=self._last_log_messages, 355 ) 356 357 @property 358 def config_spec(self) -> dict[str, Any]: 359 """Generate a configuration spec for this connector, as a JSON Schema definition. 360 361 This function generates a JSON Schema dictionary with configuration specs for the 362 current connector, as a dictionary. 363 364 Returns: 365 dict: The JSON Schema configuration spec as a dictionary. 366 """ 367 return self._get_spec(force_refresh=True).connectionSpecification 368 369 @property 370 def _yaml_spec(self) -> str: 371 """Get the spec as a yaml string. 372 373 For now, the primary use case is for writing and debugging a valid config for a source. 374 375 This is private for now because we probably want better polish before exposing this 376 as a stable interface. This will also get easier when we have docs links with this info 377 for each connector. 378 """ 379 spec_obj: ConnectorSpecification = self._get_spec() 380 spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True) 381 # convert to a yaml string 382 return yaml.dump(spec_dict) 383 384 @property 385 def docs_url(self) -> str: 386 """Get the URL to the connector's documentation.""" 387 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 388 "source-", "" 389 ) 390 391 @property 392 def discovered_catalog(self) -> AirbyteCatalog: 393 """Get the raw catalog for the given streams. 394 395 If the catalog is not yet known, we call discover to get it. 396 """ 397 if self._discovered_catalog is None: 398 self._discovered_catalog = self._discover() 399 400 return self._discovered_catalog 401 402 @property 403 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 404 """Get the configured catalog for the given streams. 405 406 If the raw catalog is not yet known, we call discover to get it. 407 408 If no specific streams are selected, we return a catalog that syncs all available streams. 409 410 TODO: We should consider disabling by default the streams that the connector would 411 disable by default. (For instance, streams that require a premium license are sometimes 412 disabled by default within the connector.) 413 """ 414 # Ensure discovered catalog is cached before we start 415 _ = self.discovered_catalog 416 417 # Filter for selected streams if set, otherwise use all available streams: 418 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 419 return self.get_configured_catalog(streams=streams_filter) 420 421 def get_configured_catalog( 422 self, 423 streams: Literal["*"] | list[str] | None = None, 424 *, 425 force_full_refresh: bool = False, 426 ) -> ConfiguredAirbyteCatalog: 427 """Get a configured catalog for the given streams. 428 429 If no streams are provided, the selected streams will be used. If no streams are selected, 430 all available streams will be used. 431 432 If '*' is provided, all available streams will be used. 433 434 If force_full_refresh is True, streams will be configured with full_refresh sync mode 435 when supported by the stream. Otherwise, incremental sync mode is used when supported. 436 """ 437 selected_streams: list[str] = [] 438 if streams is None: 439 selected_streams = self._selected_stream_names or self.get_available_streams() 440 elif streams == "*": 441 selected_streams = self.get_available_streams() 442 elif isinstance(streams, list): 443 selected_streams = streams 444 else: 445 raise exc.PyAirbyteInputError( 446 message="Invalid streams argument.", 447 input_value=streams, 448 ) 449 450 def _get_sync_mode(stream: AirbyteStream) -> SyncMode: 451 """Determine the sync mode for a stream based on force_full_refresh and support.""" 452 # Use getattr to handle mocks or streams without supported_sync_modes attribute 453 supported_modes = getattr(stream, "supported_sync_modes", None) 454 455 if force_full_refresh: 456 # When force_full_refresh is True, prefer full_refresh if supported 457 if supported_modes and SyncMode.full_refresh in supported_modes: 458 return SyncMode.full_refresh 459 # Fall back to incremental if full_refresh is not supported 460 return SyncMode.incremental 461 462 # Default behavior: preserve previous semantics (always incremental) 463 return SyncMode.incremental 464 465 return ConfiguredAirbyteCatalog( 466 streams=[ 467 ConfiguredAirbyteStream( 468 stream=stream, 469 destination_sync_mode=DestinationSyncMode.overwrite, 470 sync_mode=_get_sync_mode(stream), 471 primary_key=( 472 [self._primary_key_overrides[stream.name.lower()]] 473 if stream.name.lower() in self._primary_key_overrides 474 else stream.source_defined_primary_key 475 ), 476 cursor_field=( 477 [self._cursor_key_overrides[stream.name.lower()]] 478 if stream.name.lower() in self._cursor_key_overrides 479 else stream.default_cursor_field 480 ), 481 # These are unused in the current implementation: 482 generation_id=None, 483 minimum_generation_id=None, 484 sync_id=None, 485 ) 486 for stream in self.discovered_catalog.streams 487 if stream.name in selected_streams 488 ], 489 ) 490 491 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 492 """Return the JSON Schema spec for the specified stream name.""" 493 catalog: AirbyteCatalog = self.discovered_catalog 494 found: list[AirbyteStream] = [ 495 stream for stream in catalog.streams if stream.name == stream_name 496 ] 497 498 if len(found) == 0: 499 raise exc.PyAirbyteInputError( 500 message="Stream name does not exist in catalog.", 501 input_value=stream_name, 502 ) 503 504 if len(found) > 1: 505 raise exc.PyAirbyteInternalError( 506 message="Duplicate streams found with the same name.", 507 context={ 508 "found_streams": found, 509 }, 510 ) 511 512 return found[0].json_schema 513 514 def get_records( 515 self, 516 stream: str, 517 *, 518 limit: int | None = None, 519 stop_event: threading.Event | None = None, 520 normalize_field_names: bool = False, 521 prune_undeclared_fields: bool = True, 522 ) -> LazyDataset: 523 """Read a stream from the connector. 524 525 Args: 526 stream: The name of the stream to read. 527 limit: The maximum number of records to read. If None, all records will be read. 528 stop_event: If set, the event can be triggered by the caller to stop reading records 529 and terminate the process. 530 normalize_field_names: When `True`, field names will be normalized to lower case, with 531 special characters removed. This matches the behavior of PyAirbyte caches and most 532 Airbyte destinations. 533 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 534 which generally matches the behavior of PyAirbyte caches and most Airbyte 535 destinations, specifically when you expect the catalog may be stale. You can disable 536 this to keep all fields in the records. 537 538 This involves the following steps: 539 * Call discover to get the catalog 540 * Generate a configured catalog that syncs the given stream in full_refresh mode 541 * Write the configured catalog and the config to a temporary file 542 * execute the connector with read --config <config_file> --catalog <catalog_file> 543 * Listen to the messages and return the first AirbyteRecordMessages that come along. 544 * Make sure the subprocess is killed when the function returns. 545 """ 546 stop_event = stop_event or threading.Event() 547 configured_catalog = self.get_configured_catalog(streams=[stream]) 548 if len(configured_catalog.streams) == 0: 549 raise exc.PyAirbyteInputError( 550 message="Requested stream does not exist.", 551 context={ 552 "stream": stream, 553 "available_streams": self.get_available_streams(), 554 "connector_name": self.name, 555 }, 556 ) from KeyError(stream) 557 558 configured_stream = configured_catalog.streams[0] 559 560 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 561 yield from records 562 563 stream_record_handler = StreamRecordHandler( 564 json_schema=self.get_stream_json_schema(stream), 565 prune_extra_fields=prune_undeclared_fields, 566 normalize_keys=normalize_field_names, 567 ) 568 569 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 570 progress_tracker = ProgressTracker( 571 ProgressStyle.PLAIN, 572 source=self, 573 cache=None, 574 destination=None, 575 expected_streams=[stream], 576 ) 577 578 iterator: Iterator[dict[str, Any]] = ( 579 StreamRecord.from_record_message( 580 record_message=record.record, 581 stream_record_handler=stream_record_handler, 582 ) 583 for record in self._read_with_catalog( 584 catalog=configured_catalog, 585 progress_tracker=progress_tracker, 586 stop_event=stop_event, 587 ) 588 if record.record 589 ) 590 if limit is not None: 591 # Stop the iterator after the limit is reached 592 iterator = islice(iterator, limit) 593 594 return LazyDataset( 595 iterator, 596 stream_metadata=configured_stream, 597 stop_event=stop_event, 598 progress_tracker=progress_tracker, 599 ) 600 601 def get_documents( 602 self, 603 stream: str, 604 title_property: str | None = None, 605 content_properties: list[str] | None = None, 606 metadata_properties: list[str] | None = None, 607 *, 608 render_metadata: bool = False, 609 ) -> Iterable[Document]: 610 """Read a stream from the connector and return the records as documents. 611 612 If metadata_properties is not set, all properties that are not content will be added to 613 the metadata. 614 615 If render_metadata is True, metadata will be rendered in the document, as well as the 616 the main content. 617 """ 618 return self.get_records(stream).to_documents( 619 title_property=title_property, 620 content_properties=content_properties, 621 metadata_properties=metadata_properties, 622 render_metadata=render_metadata, 623 ) 624 625 def get_samples( 626 self, 627 streams: list[str] | Literal["*"] | None = None, 628 *, 629 limit: int = 5, 630 on_error: Literal["raise", "ignore", "log"] = "raise", 631 ) -> dict[str, InMemoryDataset | None]: 632 """Get a sample of records from the given streams.""" 633 if streams == "*": 634 streams = self.get_available_streams() 635 elif streams is None: 636 streams = self.get_selected_streams() 637 638 results: dict[str, InMemoryDataset | None] = {} 639 for stream in streams: 640 stop_event = threading.Event() 641 try: 642 results[stream] = self.get_records( 643 stream, 644 limit=limit, 645 stop_event=stop_event, 646 ).fetch_all() 647 stop_event.set() 648 except Exception as ex: 649 results[stream] = None 650 if on_error == "ignore": 651 continue 652 653 if on_error == "raise": 654 raise ex from None 655 656 if on_error == "log": 657 print(f"Error fetching sample for stream '{stream}': {ex}") 658 659 return results 660 661 def print_samples( 662 self, 663 streams: list[str] | Literal["*"] | None = None, 664 *, 665 limit: int = 5, 666 on_error: Literal["raise", "ignore", "log"] = "log", 667 ) -> None: 668 """Print a sample of records from the given streams.""" 669 internal_cols: list[str] = [ 670 AB_EXTRACTED_AT_COLUMN, 671 AB_META_COLUMN, 672 AB_RAW_ID_COLUMN, 673 ] 674 col_limit = 10 675 if streams == "*": 676 streams = self.get_available_streams() 677 elif streams is None: 678 streams = self.get_selected_streams() 679 680 console = Console() 681 682 console.print( 683 Markdown( 684 f"# Sample Records from `{self.name}` ({len(streams)} selected streams)", 685 justify="left", 686 ) 687 ) 688 689 for stream in streams: 690 console.print(Markdown(f"## `{stream}` Stream Sample", justify="left")) 691 samples = self.get_samples( 692 streams=[stream], 693 limit=limit, 694 on_error=on_error, 695 ) 696 dataset = samples[stream] 697 698 table = Table( 699 show_header=True, 700 show_lines=True, 701 ) 702 if dataset is None: 703 console.print( 704 Markdown("**⚠️ `Error fetching sample records.` ⚠️**"), 705 ) 706 continue 707 708 if len(dataset.column_names) > col_limit: 709 # We'll pivot the columns so each column is its own row 710 table.add_column("Column Name") 711 for _ in range(len(dataset)): 712 table.add_column(overflow="fold") 713 for col in dataset.column_names: 714 table.add_row( 715 Markdown(f"**`{col}`**"), 716 *[escape(str(record[col])) for record in dataset], 717 ) 718 else: 719 for col in dataset.column_names: 720 table.add_column( 721 Markdown(f"**`{col}`**"), 722 overflow="fold", 723 ) 724 725 for record in dataset: 726 table.add_row( 727 *[ 728 escape(str(val)) 729 for key, val in record.items() 730 # Exclude internal Airbyte columns. 731 if key not in internal_cols 732 ] 733 ) 734 735 console.print(table) 736 737 console.print(Markdown("--------------")) 738 739 def _get_airbyte_message_iterator( 740 self, 741 *, 742 streams: Literal["*"] | list[str] | None = None, 743 state_provider: StateProviderBase | None = None, 744 progress_tracker: ProgressTracker, 745 force_full_refresh: bool = False, 746 ) -> AirbyteMessageIterator: 747 """Get an AirbyteMessageIterator for this source.""" 748 return AirbyteMessageIterator( 749 self._read_with_catalog( 750 catalog=self.get_configured_catalog( 751 streams=streams, 752 force_full_refresh=force_full_refresh, 753 ), 754 state=state_provider if not force_full_refresh else None, 755 progress_tracker=progress_tracker, 756 ) 757 ) 758 759 def _read_with_catalog( 760 self, 761 catalog: ConfiguredAirbyteCatalog, 762 progress_tracker: ProgressTracker, 763 *, 764 state: StateProviderBase | None = None, 765 stop_event: threading.Event | None = None, 766 ) -> Generator[AirbyteMessage, None, None]: 767 """Call read on the connector. 768 769 This involves the following steps: 770 * Write the config to a temporary file 771 * execute the connector with read --config <config_file> --catalog <catalog_file> 772 * Listen to the messages and return the AirbyteRecordMessages that come along. 773 * Send out telemetry on the performed sync (with information about which source was used and 774 the type of the cache) 775 """ 776 with as_temp_files( 777 [ 778 self._hydrated_config, 779 catalog.model_dump_json(exclude_none=True), 780 state.to_state_input_file_text() if state else "[]", 781 ] 782 ) as [ 783 config_file, 784 catalog_file, 785 state_file, 786 ]: 787 message_generator = self._execute( 788 [ 789 "read", 790 "--config", 791 config_file, 792 "--catalog", 793 catalog_file, 794 "--state", 795 state_file, 796 ], 797 progress_tracker=progress_tracker, 798 ) 799 for message in progress_tracker.tally_records_read(message_generator): 800 if stop_event and stop_event.is_set(): 801 progress_tracker._log_sync_cancel() # noqa: SLF001 802 time.sleep(0.1) 803 return 804 805 yield message 806 807 progress_tracker.log_read_complete() 808 809 def _peek_airbyte_message( 810 self, 811 message: AirbyteMessage, 812 *, 813 raise_on_error: bool = True, 814 ) -> None: 815 """Process an Airbyte message. 816 817 This method handles reading Airbyte messages and taking action, if needed, based on the 818 message type. For instance, log messages are logged, records are tallied, and errors are 819 raised as exceptions if `raise_on_error` is True. 820 821 Raises: 822 AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted. 823 """ 824 super()._peek_airbyte_message(message, raise_on_error=raise_on_error) 825 826 def _log_incremental_streams( 827 self, 828 *, 829 incremental_streams: set[str] | None = None, 830 ) -> None: 831 """Log the streams which are using incremental sync mode.""" 832 log_message = ( 833 "The following streams are currently using incremental sync:\n" 834 f"{incremental_streams}\n" 835 "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method." 836 ) 837 print(log_message, file=sys.stderr) 838 839 def read( 840 self, 841 cache: CacheBase | None = None, 842 *, 843 streams: str | list[str] | None = None, 844 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 845 force_full_refresh: bool = False, 846 skip_validation: bool = False, 847 ) -> ReadResult: 848 """Read from the connector and write to the cache. 849 850 Args: 851 cache: The cache to write to. If not set, a default cache will be used. 852 streams: Optional if already set. A list of stream names to select for reading. If set 853 to "*", all streams will be selected. 854 write_strategy: The strategy to use when writing to the cache. If a string, it must be 855 one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one 856 of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or 857 WriteStrategy.AUTO. 858 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 859 streams will be read in incremental mode if supported by the connector. This option 860 must be True when using the "replace" strategy. 861 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 862 running the connector. This can be helpful in debugging, when you want to send 863 configurations to the connector that otherwise might be rejected by JSON Schema 864 validation rules. 865 """ 866 cache = cache or get_default_cache() 867 progress_tracker = ProgressTracker( 868 source=self, 869 cache=cache, 870 destination=None, 871 expected_streams=None, # Will be set later 872 ) 873 874 # Set up state provider if not in full refresh mode 875 if force_full_refresh: 876 state_provider: StateProviderBase | None = None 877 else: 878 state_provider = cache.get_state_provider( 879 source_name=self._name, 880 ) 881 state_writer = cache.get_state_writer(source_name=self._name) 882 883 if streams: 884 self.select_streams(streams) 885 886 if not self._selected_stream_names: 887 raise exc.PyAirbyteNoStreamsSelectedError( 888 connector_name=self.name, 889 available_streams=self.get_available_streams(), 890 ) 891 892 try: 893 result = self._read_to_cache( 894 cache=cache, 895 catalog_provider=CatalogProvider( 896 self.get_configured_catalog(force_full_refresh=force_full_refresh) 897 ), 898 stream_names=self._selected_stream_names, 899 state_provider=state_provider, 900 state_writer=state_writer, 901 write_strategy=write_strategy, 902 force_full_refresh=force_full_refresh, 903 skip_validation=skip_validation, 904 progress_tracker=progress_tracker, 905 ) 906 except exc.PyAirbyteInternalError as ex: 907 progress_tracker.log_failure(exception=ex) 908 raise exc.AirbyteConnectorFailedError( 909 connector_name=self.name, 910 log_text=self._last_log_messages, 911 ) from ex 912 except Exception as ex: 913 progress_tracker.log_failure(exception=ex) 914 raise 915 916 progress_tracker.log_success() 917 return result 918 919 def _read_to_cache( # noqa: PLR0913 # Too many arguments 920 self, 921 cache: CacheBase, 922 *, 923 catalog_provider: CatalogProvider, 924 stream_names: list[str], 925 state_provider: StateProviderBase | None, 926 state_writer: StateWriterBase | None, 927 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 928 force_full_refresh: bool = False, 929 skip_validation: bool = False, 930 progress_tracker: ProgressTracker, 931 ) -> ReadResult: 932 """Internal read method.""" 933 if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: 934 warnings.warn( 935 message=( 936 "Using `REPLACE` strategy without also setting `force_full_refresh=True` " 937 "could result in data loss. " 938 "To silence this warning, use the following: " 939 'warnings.filterwarnings("ignore", ' 940 'category="airbyte.warnings.PyAirbyteDataLossWarning")`' 941 ), 942 category=exc.PyAirbyteDataLossWarning, 943 stacklevel=1, 944 ) 945 if isinstance(write_strategy, str): 946 try: 947 write_strategy = WriteStrategy(write_strategy) 948 except ValueError: 949 raise exc.PyAirbyteInputError( 950 message="Invalid strategy", 951 context={ 952 "write_strategy": write_strategy, 953 "available_strategies": [ 954 s.value 955 for s in WriteStrategy # pyrefly: ignore[not-iterable] 956 ], 957 }, 958 ) from None 959 960 # Run optional validation step 961 if not skip_validation: 962 self.validate_config() 963 964 # Log incremental stream if incremental streams are known 965 if state_provider and state_provider.known_stream_names: 966 # Retrieve set of the known streams support which support incremental sync 967 incremental_streams = ( 968 set(self._get_incremental_stream_names()) 969 & state_provider.known_stream_names 970 & set(self.get_selected_streams()) 971 ) 972 if incremental_streams: 973 self._log_incremental_streams(incremental_streams=incremental_streams) 974 975 airbyte_message_iterator = AirbyteMessageIterator( 976 self._read_with_catalog( 977 catalog=catalog_provider.configured_catalog, 978 state=state_provider, 979 progress_tracker=progress_tracker, 980 ) 981 ) 982 cache._write_airbyte_message_stream( # noqa: SLF001 # Non-public API 983 stdin=airbyte_message_iterator, 984 catalog_provider=catalog_provider, 985 write_strategy=write_strategy, 986 state_writer=state_writer, 987 progress_tracker=progress_tracker, 988 ) 989 990 # Flush the WAL, if applicable 991 cache.processor._do_checkpoint() # noqa: SLF001 # Non-public API 992 993 return ReadResult( 994 source_name=self.name, 995 progress_tracker=progress_tracker, 996 processed_streams=stream_names, 997 cache=cache, 998 )
A class representing a source that can be called.
73 def __init__( 74 self, 75 executor: Executor, 76 name: str, 77 config: dict[str, Any] | None = None, 78 *, 79 config_change_callback: ConfigChangeCallback | None = None, 80 streams: str | list[str] | None = None, 81 validate: bool = False, 82 cursor_key_overrides: dict[str, str] | None = None, 83 primary_key_overrides: dict[str, str | list[str]] | None = None, 84 ) -> None: 85 """Initialize the source. 86 87 If config is provided, it will be validated against the spec if validate is True. 88 """ 89 self._to_be_selected_streams: list[str] | str = [] 90 """Used to hold selection criteria before catalog is known.""" 91 92 super().__init__( 93 executor=executor, 94 name=name, 95 config=config, 96 config_change_callback=config_change_callback, 97 validate=validate, 98 ) 99 self._config_dict: dict[str, Any] | None = None 100 self._last_log_messages: list[str] = [] 101 self._discovered_catalog: AirbyteCatalog | None = None 102 self._selected_stream_names: list[str] = [] 103 104 self._cursor_key_overrides: dict[str, str] = {} 105 """A mapping of lower-cased stream names to cursor key overrides.""" 106 107 self._primary_key_overrides: dict[str, list[str]] = {} 108 """A mapping of lower-cased stream names to primary key overrides.""" 109 110 if config is not None: 111 self.set_config(config, validate=validate) 112 if streams is not None: 113 self.select_streams(streams) 114 if cursor_key_overrides is not None: 115 self.set_cursor_keys(**cursor_key_overrides) 116 if primary_key_overrides is not None: 117 self.set_primary_keys(**primary_key_overrides)
Initialize the source.
If config is provided, it will be validated against the spec if validate is True.
119 def set_streams(self, streams: list[str]) -> None: 120 """Deprecated. See select_streams().""" 121 warnings.warn( 122 "The 'set_streams' method is deprecated and will be removed in a future version. " 123 "Please use the 'select_streams' method instead.", 124 DeprecationWarning, 125 stacklevel=2, 126 ) 127 self.select_streams(streams)
Deprecated. See select_streams().
129 def set_cursor_key( 130 self, 131 stream_name: str, 132 cursor_key: str, 133 ) -> None: 134 """Set the cursor for a single stream. 135 136 Note: 137 - This does not unset previously set cursors. 138 - The cursor key must be a single field name. 139 - Not all streams support custom cursors. If a stream does not support custom cursors, 140 the override may be ignored. 141 - Stream names are case insensitive, while field names are case sensitive. 142 - Stream names are not validated by PyAirbyte. If the stream name 143 does not exist in the catalog, the override may be ignored. 144 """ 145 self._cursor_key_overrides[stream_name.lower()] = cursor_key
Set the cursor for a single stream.
Note:
- This does not unset previously set cursors.
- The cursor key must be a single field name.
- Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
147 def set_cursor_keys( 148 self, 149 **kwargs: str, 150 ) -> None: 151 """Override the cursor key for one or more streams. 152 153 Usage: 154 source.set_cursor_keys( 155 stream1="cursor1", 156 stream2="cursor2", 157 ) 158 159 Note: 160 - This does not unset previously set cursors. 161 - The cursor key must be a single field name. 162 - Not all streams support custom cursors. If a stream does not support custom cursors, 163 the override may be ignored. 164 - Stream names are case insensitive, while field names are case sensitive. 165 - Stream names are not validated by PyAirbyte. If the stream name 166 does not exist in the catalog, the override may be ignored. 167 """ 168 self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})
Override the cursor key for one or more streams.
Usage:
source.set_cursor_keys( stream1="cursor1", stream2="cursor2", )
Note:
- This does not unset previously set cursors.
- The cursor key must be a single field name.
- Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
170 def set_primary_key( 171 self, 172 stream_name: str, 173 primary_key: str | list[str], 174 ) -> None: 175 """Set the primary key for a single stream. 176 177 Note: 178 - This does not unset previously set primary keys. 179 - The primary key must be a single field name or a list of field names. 180 - Not all streams support overriding primary keys. If a stream does not support overriding 181 primary keys, the override may be ignored. 182 - Stream names are case insensitive, while field names are case sensitive. 183 - Stream names are not validated by PyAirbyte. If the stream name 184 does not exist in the catalog, the override may be ignored. 185 """ 186 self._primary_key_overrides[stream_name.lower()] = ( 187 primary_key if isinstance(primary_key, list) else [primary_key] 188 )
Set the primary key for a single stream.
Note:
- This does not unset previously set primary keys.
- The primary key must be a single field name or a list of field names.
- Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
190 def set_primary_keys( 191 self, 192 **kwargs: str | list[str], 193 ) -> None: 194 """Override the primary keys for one or more streams. 195 196 This does not unset previously set primary keys. 197 198 Usage: 199 source.set_primary_keys( 200 stream1="pk1", 201 stream2=["pk1", "pk2"], 202 ) 203 204 Note: 205 - This does not unset previously set primary keys. 206 - The primary key must be a single field name or a list of field names. 207 - Not all streams support overriding primary keys. If a stream does not support overriding 208 primary keys, the override may be ignored. 209 - Stream names are case insensitive, while field names are case sensitive. 210 - Stream names are not validated by PyAirbyte. If the stream name 211 does not exist in the catalog, the override may be ignored. 212 """ 213 self._primary_key_overrides.update( 214 {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()} 215 )
Override the primary keys for one or more streams.
This does not unset previously set primary keys.
Usage:
source.set_primary_keys( stream1="pk1", stream2=["pk1", "pk2"], )
Note:
- This does not unset previously set primary keys.
- The primary key must be a single field name or a list of field names.
- Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
231 def select_all_streams(self) -> None: 232 """Select all streams. 233 234 This is a more streamlined equivalent to: 235 > source.select_streams(source.get_available_streams()). 236 """ 237 if self._config_dict is None: 238 self._to_be_selected_streams = "*" 239 self._log_warning_preselected_stream(self._to_be_selected_streams) 240 return 241 242 self._selected_stream_names = self.get_available_streams()
Select all streams.
This is a more streamlined equivalent to:
source.select_streams(source.get_available_streams()).
244 def select_streams(self, streams: str | list[str]) -> None: 245 """Select the stream names that should be read from the connector. 246 247 Args: 248 streams: A list of stream names to select. If set to "*", all streams will be selected. 249 250 Currently, if this is not set, all streams will be read. 251 """ 252 if self._config_dict is None: 253 self._to_be_selected_streams = streams 254 self._log_warning_preselected_stream(streams) 255 return 256 257 if streams == "*": 258 self.select_all_streams() 259 return 260 261 if isinstance(streams, str): 262 # If a single stream is provided, convert it to a one-item list 263 streams = [streams] 264 265 available_streams = self.get_available_streams() 266 for stream in streams: 267 if stream not in available_streams: 268 raise exc.AirbyteStreamNotFoundError( 269 stream_name=stream, 270 connector_name=self.name, 271 available_streams=available_streams, 272 ) 273 self._selected_stream_names = streams
Select the stream names that should be read from the connector.
Arguments:
- streams: A list of stream names to select. If set to "*", all streams will be selected.
Currently, if this is not set, all streams will be read.
275 def get_selected_streams(self) -> list[str]: 276 """Get the selected streams. 277 278 If no streams are selected, return an empty list. 279 """ 280 return self._selected_stream_names
Get the selected streams.
If no streams are selected, return an empty list.
282 def set_config( 283 self, 284 config: dict[str, Any], 285 *, 286 validate: bool = True, 287 ) -> None: 288 """Set the config for the connector. 289 290 If validate is True, raise an exception if the config fails validation. 291 292 If validate is False, validation will be deferred until check() or validate_config() 293 is called. 294 """ 295 if validate: 296 self.validate_config(config) 297 298 self._config_dict = config 299 300 if self._to_be_selected_streams: 301 self.select_streams(self._to_be_selected_streams) 302 self._to_be_selected_streams = []
Set the config for the connector.
If validate is True, raise an exception if the config fails validation.
If validate is False, validation will be deferred until check() or validate_config() is called.
322 def get_available_streams(self) -> list[str]: 323 """Get the available streams from the spec.""" 324 return [s.name for s in self.discovered_catalog.streams]
Get the available streams from the spec.
357 @property 358 def config_spec(self) -> dict[str, Any]: 359 """Generate a configuration spec for this connector, as a JSON Schema definition. 360 361 This function generates a JSON Schema dictionary with configuration specs for the 362 current connector, as a dictionary. 363 364 Returns: 365 dict: The JSON Schema configuration spec as a dictionary. 366 """ 367 return self._get_spec(force_refresh=True).connectionSpecification
Generate a configuration spec for this connector, as a JSON Schema definition.
This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.
Returns:
dict: The JSON Schema configuration spec as a dictionary.
384 @property 385 def docs_url(self) -> str: 386 """Get the URL to the connector's documentation.""" 387 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 388 "source-", "" 389 )
Get the URL to the connector's documentation.
391 @property 392 def discovered_catalog(self) -> AirbyteCatalog: 393 """Get the raw catalog for the given streams. 394 395 If the catalog is not yet known, we call discover to get it. 396 """ 397 if self._discovered_catalog is None: 398 self._discovered_catalog = self._discover() 399 400 return self._discovered_catalog
Get the raw catalog for the given streams.
If the catalog is not yet known, we call discover to get it.
402 @property 403 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 404 """Get the configured catalog for the given streams. 405 406 If the raw catalog is not yet known, we call discover to get it. 407 408 If no specific streams are selected, we return a catalog that syncs all available streams. 409 410 TODO: We should consider disabling by default the streams that the connector would 411 disable by default. (For instance, streams that require a premium license are sometimes 412 disabled by default within the connector.) 413 """ 414 # Ensure discovered catalog is cached before we start 415 _ = self.discovered_catalog 416 417 # Filter for selected streams if set, otherwise use all available streams: 418 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 419 return self.get_configured_catalog(streams=streams_filter)
Get the configured catalog for the given streams.
If the raw catalog is not yet known, we call discover to get it.
If no specific streams are selected, we return a catalog that syncs all available streams.
TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)
421 def get_configured_catalog( 422 self, 423 streams: Literal["*"] | list[str] | None = None, 424 *, 425 force_full_refresh: bool = False, 426 ) -> ConfiguredAirbyteCatalog: 427 """Get a configured catalog for the given streams. 428 429 If no streams are provided, the selected streams will be used. If no streams are selected, 430 all available streams will be used. 431 432 If '*' is provided, all available streams will be used. 433 434 If force_full_refresh is True, streams will be configured with full_refresh sync mode 435 when supported by the stream. Otherwise, incremental sync mode is used when supported. 436 """ 437 selected_streams: list[str] = [] 438 if streams is None: 439 selected_streams = self._selected_stream_names or self.get_available_streams() 440 elif streams == "*": 441 selected_streams = self.get_available_streams() 442 elif isinstance(streams, list): 443 selected_streams = streams 444 else: 445 raise exc.PyAirbyteInputError( 446 message="Invalid streams argument.", 447 input_value=streams, 448 ) 449 450 def _get_sync_mode(stream: AirbyteStream) -> SyncMode: 451 """Determine the sync mode for a stream based on force_full_refresh and support.""" 452 # Use getattr to handle mocks or streams without supported_sync_modes attribute 453 supported_modes = getattr(stream, "supported_sync_modes", None) 454 455 if force_full_refresh: 456 # When force_full_refresh is True, prefer full_refresh if supported 457 if supported_modes and SyncMode.full_refresh in supported_modes: 458 return SyncMode.full_refresh 459 # Fall back to incremental if full_refresh is not supported 460 return SyncMode.incremental 461 462 # Default behavior: preserve previous semantics (always incremental) 463 return SyncMode.incremental 464 465 return ConfiguredAirbyteCatalog( 466 streams=[ 467 ConfiguredAirbyteStream( 468 stream=stream, 469 destination_sync_mode=DestinationSyncMode.overwrite, 470 sync_mode=_get_sync_mode(stream), 471 primary_key=( 472 [self._primary_key_overrides[stream.name.lower()]] 473 if stream.name.lower() in self._primary_key_overrides 474 else stream.source_defined_primary_key 475 ), 476 cursor_field=( 477 [self._cursor_key_overrides[stream.name.lower()]] 478 if stream.name.lower() in self._cursor_key_overrides 479 else stream.default_cursor_field 480 ), 481 # These are unused in the current implementation: 482 generation_id=None, 483 minimum_generation_id=None, 484 sync_id=None, 485 ) 486 for stream in self.discovered_catalog.streams 487 if stream.name in selected_streams 488 ], 489 )
Get a configured catalog for the given streams.
If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.
If '*' is provided, all available streams will be used.
If force_full_refresh is True, streams will be configured with full_refresh sync mode when supported by the stream. Otherwise, incremental sync mode is used when supported.
491 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 492 """Return the JSON Schema spec for the specified stream name.""" 493 catalog: AirbyteCatalog = self.discovered_catalog 494 found: list[AirbyteStream] = [ 495 stream for stream in catalog.streams if stream.name == stream_name 496 ] 497 498 if len(found) == 0: 499 raise exc.PyAirbyteInputError( 500 message="Stream name does not exist in catalog.", 501 input_value=stream_name, 502 ) 503 504 if len(found) > 1: 505 raise exc.PyAirbyteInternalError( 506 message="Duplicate streams found with the same name.", 507 context={ 508 "found_streams": found, 509 }, 510 ) 511 512 return found[0].json_schema
Return the JSON Schema spec for the specified stream name.
514 def get_records( 515 self, 516 stream: str, 517 *, 518 limit: int | None = None, 519 stop_event: threading.Event | None = None, 520 normalize_field_names: bool = False, 521 prune_undeclared_fields: bool = True, 522 ) -> LazyDataset: 523 """Read a stream from the connector. 524 525 Args: 526 stream: The name of the stream to read. 527 limit: The maximum number of records to read. If None, all records will be read. 528 stop_event: If set, the event can be triggered by the caller to stop reading records 529 and terminate the process. 530 normalize_field_names: When `True`, field names will be normalized to lower case, with 531 special characters removed. This matches the behavior of PyAirbyte caches and most 532 Airbyte destinations. 533 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 534 which generally matches the behavior of PyAirbyte caches and most Airbyte 535 destinations, specifically when you expect the catalog may be stale. You can disable 536 this to keep all fields in the records. 537 538 This involves the following steps: 539 * Call discover to get the catalog 540 * Generate a configured catalog that syncs the given stream in full_refresh mode 541 * Write the configured catalog and the config to a temporary file 542 * execute the connector with read --config <config_file> --catalog <catalog_file> 543 * Listen to the messages and return the first AirbyteRecordMessages that come along. 544 * Make sure the subprocess is killed when the function returns. 545 """ 546 stop_event = stop_event or threading.Event() 547 configured_catalog = self.get_configured_catalog(streams=[stream]) 548 if len(configured_catalog.streams) == 0: 549 raise exc.PyAirbyteInputError( 550 message="Requested stream does not exist.", 551 context={ 552 "stream": stream, 553 "available_streams": self.get_available_streams(), 554 "connector_name": self.name, 555 }, 556 ) from KeyError(stream) 557 558 configured_stream = configured_catalog.streams[0] 559 560 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 561 yield from records 562 563 stream_record_handler = StreamRecordHandler( 564 json_schema=self.get_stream_json_schema(stream), 565 prune_extra_fields=prune_undeclared_fields, 566 normalize_keys=normalize_field_names, 567 ) 568 569 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 570 progress_tracker = ProgressTracker( 571 ProgressStyle.PLAIN, 572 source=self, 573 cache=None, 574 destination=None, 575 expected_streams=[stream], 576 ) 577 578 iterator: Iterator[dict[str, Any]] = ( 579 StreamRecord.from_record_message( 580 record_message=record.record, 581 stream_record_handler=stream_record_handler, 582 ) 583 for record in self._read_with_catalog( 584 catalog=configured_catalog, 585 progress_tracker=progress_tracker, 586 stop_event=stop_event, 587 ) 588 if record.record 589 ) 590 if limit is not None: 591 # Stop the iterator after the limit is reached 592 iterator = islice(iterator, limit) 593 594 return LazyDataset( 595 iterator, 596 stream_metadata=configured_stream, 597 stop_event=stop_event, 598 progress_tracker=progress_tracker, 599 )
Read a stream from the connector.
Arguments:
- stream: The name of the stream to read.
- limit: The maximum number of records to read. If None, all records will be read.
- stop_event: If set, the event can be triggered by the caller to stop reading records and terminate the process.
- normalize_field_names: When
True, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations. - prune_undeclared_fields: When
True, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.
This involves the following steps:
- Call discover to get the catalog
- Generate a configured catalog that syncs the given stream in full_refresh mode
- Write the configured catalog and the config to a temporary file
- execute the connector with read --config
--catalog - Listen to the messages and return the first AirbyteRecordMessages that come along.
- Make sure the subprocess is killed when the function returns.
601 def get_documents( 602 self, 603 stream: str, 604 title_property: str | None = None, 605 content_properties: list[str] | None = None, 606 metadata_properties: list[str] | None = None, 607 *, 608 render_metadata: bool = False, 609 ) -> Iterable[Document]: 610 """Read a stream from the connector and return the records as documents. 611 612 If metadata_properties is not set, all properties that are not content will be added to 613 the metadata. 614 615 If render_metadata is True, metadata will be rendered in the document, as well as the 616 the main content. 617 """ 618 return self.get_records(stream).to_documents( 619 title_property=title_property, 620 content_properties=content_properties, 621 metadata_properties=metadata_properties, 622 render_metadata=render_metadata, 623 )
Read a stream from the connector and return the records as documents.
If metadata_properties is not set, all properties that are not content will be added to the metadata.
If render_metadata is True, metadata will be rendered in the document, as well as the the main content.
625 def get_samples( 626 self, 627 streams: list[str] | Literal["*"] | None = None, 628 *, 629 limit: int = 5, 630 on_error: Literal["raise", "ignore", "log"] = "raise", 631 ) -> dict[str, InMemoryDataset | None]: 632 """Get a sample of records from the given streams.""" 633 if streams == "*": 634 streams = self.get_available_streams() 635 elif streams is None: 636 streams = self.get_selected_streams() 637 638 results: dict[str, InMemoryDataset | None] = {} 639 for stream in streams: 640 stop_event = threading.Event() 641 try: 642 results[stream] = self.get_records( 643 stream, 644 limit=limit, 645 stop_event=stop_event, 646 ).fetch_all() 647 stop_event.set() 648 except Exception as ex: 649 results[stream] = None 650 if on_error == "ignore": 651 continue 652 653 if on_error == "raise": 654 raise ex from None 655 656 if on_error == "log": 657 print(f"Error fetching sample for stream '{stream}': {ex}") 658 659 return results
Get a sample of records from the given streams.
661 def print_samples( 662 self, 663 streams: list[str] | Literal["*"] | None = None, 664 *, 665 limit: int = 5, 666 on_error: Literal["raise", "ignore", "log"] = "log", 667 ) -> None: 668 """Print a sample of records from the given streams.""" 669 internal_cols: list[str] = [ 670 AB_EXTRACTED_AT_COLUMN, 671 AB_META_COLUMN, 672 AB_RAW_ID_COLUMN, 673 ] 674 col_limit = 10 675 if streams == "*": 676 streams = self.get_available_streams() 677 elif streams is None: 678 streams = self.get_selected_streams() 679 680 console = Console() 681 682 console.print( 683 Markdown( 684 f"# Sample Records from `{self.name}` ({len(streams)} selected streams)", 685 justify="left", 686 ) 687 ) 688 689 for stream in streams: 690 console.print(Markdown(f"## `{stream}` Stream Sample", justify="left")) 691 samples = self.get_samples( 692 streams=[stream], 693 limit=limit, 694 on_error=on_error, 695 ) 696 dataset = samples[stream] 697 698 table = Table( 699 show_header=True, 700 show_lines=True, 701 ) 702 if dataset is None: 703 console.print( 704 Markdown("**⚠️ `Error fetching sample records.` ⚠️**"), 705 ) 706 continue 707 708 if len(dataset.column_names) > col_limit: 709 # We'll pivot the columns so each column is its own row 710 table.add_column("Column Name") 711 for _ in range(len(dataset)): 712 table.add_column(overflow="fold") 713 for col in dataset.column_names: 714 table.add_row( 715 Markdown(f"**`{col}`**"), 716 *[escape(str(record[col])) for record in dataset], 717 ) 718 else: 719 for col in dataset.column_names: 720 table.add_column( 721 Markdown(f"**`{col}`**"), 722 overflow="fold", 723 ) 724 725 for record in dataset: 726 table.add_row( 727 *[ 728 escape(str(val)) 729 for key, val in record.items() 730 # Exclude internal Airbyte columns. 731 if key not in internal_cols 732 ] 733 ) 734 735 console.print(table) 736 737 console.print(Markdown("--------------"))
Print a sample of records from the given streams.
839 def read( 840 self, 841 cache: CacheBase | None = None, 842 *, 843 streams: str | list[str] | None = None, 844 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 845 force_full_refresh: bool = False, 846 skip_validation: bool = False, 847 ) -> ReadResult: 848 """Read from the connector and write to the cache. 849 850 Args: 851 cache: The cache to write to. If not set, a default cache will be used. 852 streams: Optional if already set. A list of stream names to select for reading. If set 853 to "*", all streams will be selected. 854 write_strategy: The strategy to use when writing to the cache. If a string, it must be 855 one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one 856 of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or 857 WriteStrategy.AUTO. 858 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 859 streams will be read in incremental mode if supported by the connector. This option 860 must be True when using the "replace" strategy. 861 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 862 running the connector. This can be helpful in debugging, when you want to send 863 configurations to the connector that otherwise might be rejected by JSON Schema 864 validation rules. 865 """ 866 cache = cache or get_default_cache() 867 progress_tracker = ProgressTracker( 868 source=self, 869 cache=cache, 870 destination=None, 871 expected_streams=None, # Will be set later 872 ) 873 874 # Set up state provider if not in full refresh mode 875 if force_full_refresh: 876 state_provider: StateProviderBase | None = None 877 else: 878 state_provider = cache.get_state_provider( 879 source_name=self._name, 880 ) 881 state_writer = cache.get_state_writer(source_name=self._name) 882 883 if streams: 884 self.select_streams(streams) 885 886 if not self._selected_stream_names: 887 raise exc.PyAirbyteNoStreamsSelectedError( 888 connector_name=self.name, 889 available_streams=self.get_available_streams(), 890 ) 891 892 try: 893 result = self._read_to_cache( 894 cache=cache, 895 catalog_provider=CatalogProvider( 896 self.get_configured_catalog(force_full_refresh=force_full_refresh) 897 ), 898 stream_names=self._selected_stream_names, 899 state_provider=state_provider, 900 state_writer=state_writer, 901 write_strategy=write_strategy, 902 force_full_refresh=force_full_refresh, 903 skip_validation=skip_validation, 904 progress_tracker=progress_tracker, 905 ) 906 except exc.PyAirbyteInternalError as ex: 907 progress_tracker.log_failure(exception=ex) 908 raise exc.AirbyteConnectorFailedError( 909 connector_name=self.name, 910 log_text=self._last_log_messages, 911 ) from ex 912 except Exception as ex: 913 progress_tracker.log_failure(exception=ex) 914 raise 915 916 progress_tracker.log_success() 917 return result
Read from the connector and write to the cache.
Arguments:
- cache: The cache to write to. If not set, a default cache will be used.
- streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
- write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
- force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
- skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.