airbyte.sources.base
Base class implementation for sources.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Base class implementation for sources.""" 3 4from __future__ import annotations 5 6import sys 7import threading 8import time 9import warnings 10from itertools import islice 11from typing import TYPE_CHECKING, Any, Literal 12 13import yaml 14from rich import print # noqa: A004 # Allow shadowing the built-in 15from rich.console import Console 16from rich.markdown import Markdown 17from rich.markup import escape 18from rich.table import Table 19from typing_extensions import override 20 21from airbyte_protocol.models import ( 22 AirbyteCatalog, 23 AirbyteMessage, 24 ConfiguredAirbyteCatalog, 25 ConfiguredAirbyteStream, 26 DestinationSyncMode, 27 SyncMode, 28 Type, 29) 30 31from airbyte import exceptions as exc 32from airbyte._connector_base import ConnectorBase 33from airbyte._message_iterators import AirbyteMessageIterator 34from airbyte._util.temp_files import as_temp_files 35from airbyte.caches.util import get_default_cache 36from airbyte.datasets._lazy import LazyDataset 37from airbyte.progress import ProgressStyle, ProgressTracker 38from airbyte.records import StreamRecord, StreamRecordHandler 39from airbyte.results import ReadResult 40from airbyte.shared.catalog_providers import CatalogProvider 41from airbyte.strategies import WriteStrategy 42 43 44if TYPE_CHECKING: 45 from collections.abc import Generator, Iterable, Iterator 46 47 from airbyte_protocol.models import ( 48 AirbyteStream, 49 ConnectorSpecification, 50 ) 51 52 from airbyte._executors.base import Executor 53 from airbyte.caches import CacheBase 54 from airbyte.callbacks import ConfigChangeCallback 55 from airbyte.datasets._inmemory import InMemoryDataset 56 from airbyte.documents import Document 57 from airbyte.shared.state_providers import StateProviderBase 58 from airbyte.shared.state_writers import StateWriterBase 59 60from airbyte.constants import ( 61 AB_EXTRACTED_AT_COLUMN, 62 AB_META_COLUMN, 63 AB_RAW_ID_COLUMN, 64) 65 66 67class Source(ConnectorBase): # noqa: PLR0904 68 """A class representing a source that can be called.""" 69 70 connector_type = "source" 71 72 def __init__( 73 self, 74 executor: Executor, 75 name: str, 76 config: dict[str, Any] | None = None, 77 *, 78 config_change_callback: ConfigChangeCallback | None = None, 79 streams: str | list[str] | None = None, 80 validate: bool = False, 81 cursor_key_overrides: dict[str, str] | None = None, 82 primary_key_overrides: dict[str, str | list[str]] | None = None, 83 ) -> None: 84 """Initialize the source. 85 86 If config is provided, it will be validated against the spec if validate is True. 87 """ 88 self._to_be_selected_streams: list[str] | str = [] 89 """Used to hold selection criteria before catalog is known.""" 90 91 super().__init__( 92 executor=executor, 93 name=name, 94 config=config, 95 config_change_callback=config_change_callback, 96 validate=validate, 97 ) 98 self._config_dict: dict[str, Any] | None = None 99 self._last_log_messages: list[str] = [] 100 self._discovered_catalog: AirbyteCatalog | None = None 101 self._selected_stream_names: list[str] = [] 102 103 self._cursor_key_overrides: dict[str, str] = {} 104 """A mapping of lower-cased stream names to cursor key overrides.""" 105 106 self._primary_key_overrides: dict[str, list[str]] = {} 107 """A mapping of lower-cased stream names to primary key overrides.""" 108 109 if config is not None: 110 self.set_config(config, validate=validate) 111 if streams is not None: 112 self.select_streams(streams) 113 if cursor_key_overrides is not None: 114 self.set_cursor_keys(**cursor_key_overrides) 115 if primary_key_overrides is not None: 116 self.set_primary_keys(**primary_key_overrides) 117 118 def set_streams(self, streams: list[str]) -> None: 119 """Deprecated. See select_streams().""" 120 warnings.warn( 121 "The 'set_streams' method is deprecated and will be removed in a future version. " 122 "Please use the 'select_streams' method instead.", 123 DeprecationWarning, 124 stacklevel=2, 125 ) 126 self.select_streams(streams) 127 128 def set_cursor_key( 129 self, 130 stream_name: str, 131 cursor_key: str, 132 ) -> None: 133 """Set the cursor for a single stream. 134 135 Note: 136 - This does not unset previously set cursors. 137 - The cursor key must be a single field name. 138 - Not all streams support custom cursors. If a stream does not support custom cursors, 139 the override may be ignored. 140 - Stream names are case insensitive, while field names are case sensitive. 141 - Stream names are not validated by PyAirbyte. If the stream name 142 does not exist in the catalog, the override may be ignored. 143 """ 144 self._cursor_key_overrides[stream_name.lower()] = cursor_key 145 146 def set_cursor_keys( 147 self, 148 **kwargs: str, 149 ) -> None: 150 """Override the cursor key for one or more streams. 151 152 Usage: 153 source.set_cursor_keys( 154 stream1="cursor1", 155 stream2="cursor2", 156 ) 157 158 Note: 159 - This does not unset previously set cursors. 160 - The cursor key must be a single field name. 161 - Not all streams support custom cursors. If a stream does not support custom cursors, 162 the override may be ignored. 163 - Stream names are case insensitive, while field names are case sensitive. 164 - Stream names are not validated by PyAirbyte. If the stream name 165 does not exist in the catalog, the override may be ignored. 166 """ 167 self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()}) 168 169 def set_primary_key( 170 self, 171 stream_name: str, 172 primary_key: str | list[str], 173 ) -> None: 174 """Set the primary key for a single stream. 175 176 Note: 177 - This does not unset previously set primary keys. 178 - The primary key must be a single field name or a list of field names. 179 - Not all streams support overriding primary keys. If a stream does not support overriding 180 primary keys, the override may be ignored. 181 - Stream names are case insensitive, while field names are case sensitive. 182 - Stream names are not validated by PyAirbyte. If the stream name 183 does not exist in the catalog, the override may be ignored. 184 """ 185 self._primary_key_overrides[stream_name.lower()] = ( 186 primary_key if isinstance(primary_key, list) else [primary_key] 187 ) 188 189 def set_primary_keys( 190 self, 191 **kwargs: str | list[str], 192 ) -> None: 193 """Override the primary keys for one or more streams. 194 195 This does not unset previously set primary keys. 196 197 Usage: 198 source.set_primary_keys( 199 stream1="pk1", 200 stream2=["pk1", "pk2"], 201 ) 202 203 Note: 204 - This does not unset previously set primary keys. 205 - The primary key must be a single field name or a list of field names. 206 - Not all streams support overriding primary keys. If a stream does not support overriding 207 primary keys, the override may be ignored. 208 - Stream names are case insensitive, while field names are case sensitive. 209 - Stream names are not validated by PyAirbyte. If the stream name 210 does not exist in the catalog, the override may be ignored. 211 """ 212 self._primary_key_overrides.update( 213 {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()} 214 ) 215 216 def _log_warning_preselected_stream(self, streams: str | list[str]) -> None: 217 """Logs a warning message indicating stream selection which are not selected yet.""" 218 if streams == "*": 219 print( 220 "Warning: Config is not set yet. All streams will be selected after config is set.", 221 file=sys.stderr, 222 ) 223 else: 224 print( 225 "Warning: Config is not set yet. " 226 f"Streams to be selected after config is set: {streams}", 227 file=sys.stderr, 228 ) 229 230 def select_all_streams(self) -> None: 231 """Select all streams. 232 233 This is a more streamlined equivalent to: 234 > source.select_streams(source.get_available_streams()). 235 """ 236 if self._config_dict is None: 237 self._to_be_selected_streams = "*" 238 self._log_warning_preselected_stream(self._to_be_selected_streams) 239 return 240 241 self._selected_stream_names = self.get_available_streams() 242 243 def select_streams(self, streams: str | list[str]) -> None: 244 """Select the stream names that should be read from the connector. 245 246 Args: 247 streams: A list of stream names to select. If set to "*", all streams will be selected. 248 249 Currently, if this is not set, all streams will be read. 250 """ 251 if self._config_dict is None: 252 self._to_be_selected_streams = streams 253 self._log_warning_preselected_stream(streams) 254 return 255 256 if streams == "*": 257 self.select_all_streams() 258 return 259 260 if isinstance(streams, str): 261 # If a single stream is provided, convert it to a one-item list 262 streams = [streams] 263 264 available_streams = self.get_available_streams() 265 for stream in streams: 266 if stream not in available_streams: 267 raise exc.AirbyteStreamNotFoundError( 268 stream_name=stream, 269 connector_name=self.name, 270 available_streams=available_streams, 271 ) 272 self._selected_stream_names = streams 273 274 def get_selected_streams(self) -> list[str]: 275 """Get the selected streams. 276 277 If no streams are selected, return an empty list. 278 """ 279 return self._selected_stream_names 280 281 def set_config( 282 self, 283 config: dict[str, Any], 284 *, 285 validate: bool = True, 286 ) -> None: 287 """Set the config for the connector. 288 289 If validate is True, raise an exception if the config fails validation. 290 291 If validate is False, validation will be deferred until check() or validate_config() 292 is called. 293 """ 294 if validate: 295 self.validate_config(config) 296 297 self._config_dict = config 298 299 if self._to_be_selected_streams: 300 self.select_streams(self._to_be_selected_streams) 301 self._to_be_selected_streams = [] 302 303 def _discover(self) -> AirbyteCatalog: 304 """Call discover on the connector. 305 306 This involves the following steps: 307 - Write the config to a temporary file 308 - execute the connector with discover --config <config_file> 309 - Listen to the messages and return the first AirbyteCatalog that comes along. 310 - Make sure the subprocess is killed when the function returns. 311 """ 312 with as_temp_files([self._hydrated_config]) as [config_file]: 313 for msg in self._execute(["discover", "--config", config_file]): 314 if msg.type == Type.CATALOG and msg.catalog: 315 return msg.catalog 316 raise exc.AirbyteConnectorMissingCatalogError( 317 connector_name=self.name, 318 log_text=self._last_log_messages, 319 ) 320 321 def get_available_streams(self) -> list[str]: 322 """Get the available streams from the spec.""" 323 return [s.name for s in self.discovered_catalog.streams] 324 325 def _get_incremental_stream_names(self) -> list[str]: 326 """Get the name of streams that support incremental sync.""" 327 return [ 328 stream.name 329 for stream in self.discovered_catalog.streams 330 if SyncMode.incremental in stream.supported_sync_modes 331 ] 332 333 @override 334 def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: 335 """Call spec on the connector. 336 337 This involves the following steps: 338 * execute the connector with spec 339 * Listen to the messages and return the first AirbyteCatalog that comes along. 340 * Make sure the subprocess is killed when the function returns. 341 """ 342 if force_refresh or self._spec is None: 343 for msg in self._execute(["spec"]): 344 if msg.type == Type.SPEC and msg.spec: 345 self._spec = msg.spec 346 break 347 348 if self._spec: 349 return self._spec 350 351 raise exc.AirbyteConnectorMissingSpecError( 352 connector_name=self.name, 353 log_text=self._last_log_messages, 354 ) 355 356 @property 357 def config_spec(self) -> dict[str, Any]: 358 """Generate a configuration spec for this connector, as a JSON Schema definition. 359 360 This function generates a JSON Schema dictionary with configuration specs for the 361 current connector, as a dictionary. 362 363 Returns: 364 dict: The JSON Schema configuration spec as a dictionary. 365 """ 366 return self._get_spec(force_refresh=True).connectionSpecification 367 368 @property 369 def _yaml_spec(self) -> str: 370 """Get the spec as a yaml string. 371 372 For now, the primary use case is for writing and debugging a valid config for a source. 373 374 This is private for now because we probably want better polish before exposing this 375 as a stable interface. This will also get easier when we have docs links with this info 376 for each connector. 377 """ 378 spec_obj: ConnectorSpecification = self._get_spec() 379 spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True) 380 # convert to a yaml string 381 return yaml.dump(spec_dict) 382 383 @property 384 def docs_url(self) -> str: 385 """Get the URL to the connector's documentation.""" 386 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 387 "source-", "" 388 ) 389 390 @property 391 def discovered_catalog(self) -> AirbyteCatalog: 392 """Get the raw catalog for the given streams. 393 394 If the catalog is not yet known, we call discover to get it. 395 """ 396 if self._discovered_catalog is None: 397 self._discovered_catalog = self._discover() 398 399 return self._discovered_catalog 400 401 @property 402 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 403 """Get the configured catalog for the given streams. 404 405 If the raw catalog is not yet known, we call discover to get it. 406 407 If no specific streams are selected, we return a catalog that syncs all available streams. 408 409 TODO: We should consider disabling by default the streams that the connector would 410 disable by default. (For instance, streams that require a premium license are sometimes 411 disabled by default within the connector.) 412 """ 413 # Ensure discovered catalog is cached before we start 414 _ = self.discovered_catalog 415 416 # Filter for selected streams if set, otherwise use all available streams: 417 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 418 return self.get_configured_catalog(streams=streams_filter) 419 420 def get_configured_catalog( 421 self, 422 streams: Literal["*"] | list[str] | None = None, 423 ) -> ConfiguredAirbyteCatalog: 424 """Get a configured catalog for the given streams. 425 426 If no streams are provided, the selected streams will be used. If no streams are selected, 427 all available streams will be used. 428 429 If '*' is provided, all available streams will be used. 430 """ 431 selected_streams: list[str] = [] 432 if streams is None: 433 selected_streams = self._selected_stream_names or self.get_available_streams() 434 elif streams == "*": 435 selected_streams = self.get_available_streams() 436 elif isinstance(streams, list): 437 selected_streams = streams 438 else: 439 raise exc.PyAirbyteInputError( 440 message="Invalid streams argument.", 441 input_value=streams, 442 ) 443 444 return ConfiguredAirbyteCatalog( 445 streams=[ 446 ConfiguredAirbyteStream( 447 stream=stream, 448 destination_sync_mode=DestinationSyncMode.overwrite, 449 sync_mode=SyncMode.incremental, 450 primary_key=( 451 [self._primary_key_overrides[stream.name.lower()]] 452 if stream.name.lower() in self._primary_key_overrides 453 else stream.source_defined_primary_key 454 ), 455 cursor_field=( 456 [self._cursor_key_overrides[stream.name.lower()]] 457 if stream.name.lower() in self._cursor_key_overrides 458 else stream.default_cursor_field 459 ), 460 # These are unused in the current implementation: 461 generation_id=None, 462 minimum_generation_id=None, 463 sync_id=None, 464 ) 465 for stream in self.discovered_catalog.streams 466 if stream.name in selected_streams 467 ], 468 ) 469 470 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 471 """Return the JSON Schema spec for the specified stream name.""" 472 catalog: AirbyteCatalog = self.discovered_catalog 473 found: list[AirbyteStream] = [ 474 stream for stream in catalog.streams if stream.name == stream_name 475 ] 476 477 if len(found) == 0: 478 raise exc.PyAirbyteInputError( 479 message="Stream name does not exist in catalog.", 480 input_value=stream_name, 481 ) 482 483 if len(found) > 1: 484 raise exc.PyAirbyteInternalError( 485 message="Duplicate streams found with the same name.", 486 context={ 487 "found_streams": found, 488 }, 489 ) 490 491 return found[0].json_schema 492 493 def get_records( 494 self, 495 stream: str, 496 *, 497 limit: int | None = None, 498 stop_event: threading.Event | None = None, 499 normalize_field_names: bool = False, 500 prune_undeclared_fields: bool = True, 501 ) -> LazyDataset: 502 """Read a stream from the connector. 503 504 Args: 505 stream: The name of the stream to read. 506 limit: The maximum number of records to read. If None, all records will be read. 507 stop_event: If set, the event can be triggered by the caller to stop reading records 508 and terminate the process. 509 normalize_field_names: When `True`, field names will be normalized to lower case, with 510 special characters removed. This matches the behavior of PyAirbyte caches and most 511 Airbyte destinations. 512 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 513 which generally matches the behavior of PyAirbyte caches and most Airbyte 514 destinations, specifically when you expect the catalog may be stale. You can disable 515 this to keep all fields in the records. 516 517 This involves the following steps: 518 * Call discover to get the catalog 519 * Generate a configured catalog that syncs the given stream in full_refresh mode 520 * Write the configured catalog and the config to a temporary file 521 * execute the connector with read --config <config_file> --catalog <catalog_file> 522 * Listen to the messages and return the first AirbyteRecordMessages that come along. 523 * Make sure the subprocess is killed when the function returns. 524 """ 525 stop_event = stop_event or threading.Event() 526 configured_catalog = self.get_configured_catalog(streams=[stream]) 527 if len(configured_catalog.streams) == 0: 528 raise exc.PyAirbyteInputError( 529 message="Requested stream does not exist.", 530 context={ 531 "stream": stream, 532 "available_streams": self.get_available_streams(), 533 "connector_name": self.name, 534 }, 535 ) from KeyError(stream) 536 537 configured_stream = configured_catalog.streams[0] 538 539 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 540 yield from records 541 542 stream_record_handler = StreamRecordHandler( 543 json_schema=self.get_stream_json_schema(stream), 544 prune_extra_fields=prune_undeclared_fields, 545 normalize_keys=normalize_field_names, 546 ) 547 548 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 549 progress_tracker = ProgressTracker( 550 ProgressStyle.PLAIN, 551 source=self, 552 cache=None, 553 destination=None, 554 expected_streams=[stream], 555 ) 556 557 iterator: Iterator[dict[str, Any]] = ( 558 StreamRecord.from_record_message( 559 record_message=record.record, 560 stream_record_handler=stream_record_handler, 561 ) 562 for record in self._read_with_catalog( 563 catalog=configured_catalog, 564 progress_tracker=progress_tracker, 565 stop_event=stop_event, 566 ) 567 if record.record 568 ) 569 if limit is not None: 570 # Stop the iterator after the limit is reached 571 iterator = islice(iterator, limit) 572 573 return LazyDataset( 574 iterator, 575 stream_metadata=configured_stream, 576 stop_event=stop_event, 577 progress_tracker=progress_tracker, 578 ) 579 580 def get_documents( 581 self, 582 stream: str, 583 title_property: str | None = None, 584 content_properties: list[str] | None = None, 585 metadata_properties: list[str] | None = None, 586 *, 587 render_metadata: bool = False, 588 ) -> Iterable[Document]: 589 """Read a stream from the connector and return the records as documents. 590 591 If metadata_properties is not set, all properties that are not content will be added to 592 the metadata. 593 594 If render_metadata is True, metadata will be rendered in the document, as well as the 595 the main content. 596 """ 597 return self.get_records(stream).to_documents( 598 title_property=title_property, 599 content_properties=content_properties, 600 metadata_properties=metadata_properties, 601 render_metadata=render_metadata, 602 ) 603 604 def get_samples( 605 self, 606 streams: list[str] | Literal["*"] | None = None, 607 *, 608 limit: int = 5, 609 on_error: Literal["raise", "ignore", "log"] = "raise", 610 ) -> dict[str, InMemoryDataset | None]: 611 """Get a sample of records from the given streams.""" 612 if streams == "*": 613 streams = self.get_available_streams() 614 elif streams is None: 615 streams = self.get_selected_streams() 616 617 results: dict[str, InMemoryDataset | None] = {} 618 for stream in streams: 619 stop_event = threading.Event() 620 try: 621 results[stream] = self.get_records( 622 stream, 623 limit=limit, 624 stop_event=stop_event, 625 ).fetch_all() 626 stop_event.set() 627 except Exception as ex: 628 results[stream] = None 629 if on_error == "ignore": 630 continue 631 632 if on_error == "raise": 633 raise ex from None 634 635 if on_error == "log": 636 print(f"Error fetching sample for stream '{stream}': {ex}") 637 638 return results 639 640 def print_samples( 641 self, 642 streams: list[str] | Literal["*"] | None = None, 643 *, 644 limit: int = 5, 645 on_error: Literal["raise", "ignore", "log"] = "log", 646 ) -> None: 647 """Print a sample of records from the given streams.""" 648 internal_cols: list[str] = [ 649 AB_EXTRACTED_AT_COLUMN, 650 AB_META_COLUMN, 651 AB_RAW_ID_COLUMN, 652 ] 653 col_limit = 10 654 if streams == "*": 655 streams = self.get_available_streams() 656 elif streams is None: 657 streams = self.get_selected_streams() 658 659 console = Console() 660 661 console.print( 662 Markdown( 663 f"# Sample Records from `{self.name}` ({len(streams)} selected streams)", 664 justify="left", 665 ) 666 ) 667 668 for stream in streams: 669 console.print(Markdown(f"## `{stream}` Stream Sample", justify="left")) 670 samples = self.get_samples( 671 streams=[stream], 672 limit=limit, 673 on_error=on_error, 674 ) 675 dataset = samples[stream] 676 677 table = Table( 678 show_header=True, 679 show_lines=True, 680 ) 681 if dataset is None: 682 console.print( 683 Markdown("**⚠️ `Error fetching sample records.` ⚠️**"), 684 ) 685 continue 686 687 if len(dataset.column_names) > col_limit: 688 # We'll pivot the columns so each column is its own row 689 table.add_column("Column Name") 690 for _ in range(len(dataset)): 691 table.add_column(overflow="fold") 692 for col in dataset.column_names: 693 table.add_row( 694 Markdown(f"**`{col}`**"), 695 *[escape(str(record[col])) for record in dataset], 696 ) 697 else: 698 for col in dataset.column_names: 699 table.add_column( 700 Markdown(f"**`{col}`**"), 701 overflow="fold", 702 ) 703 704 for record in dataset: 705 table.add_row( 706 *[ 707 escape(str(val)) 708 for key, val in record.items() 709 # Exclude internal Airbyte columns. 710 if key not in internal_cols 711 ] 712 ) 713 714 console.print(table) 715 716 console.print(Markdown("--------------")) 717 718 def _get_airbyte_message_iterator( 719 self, 720 *, 721 streams: Literal["*"] | list[str] | None = None, 722 state_provider: StateProviderBase | None = None, 723 progress_tracker: ProgressTracker, 724 force_full_refresh: bool = False, 725 ) -> AirbyteMessageIterator: 726 """Get an AirbyteMessageIterator for this source.""" 727 return AirbyteMessageIterator( 728 self._read_with_catalog( 729 catalog=self.get_configured_catalog(streams=streams), 730 state=state_provider if not force_full_refresh else None, 731 progress_tracker=progress_tracker, 732 ) 733 ) 734 735 def _read_with_catalog( 736 self, 737 catalog: ConfiguredAirbyteCatalog, 738 progress_tracker: ProgressTracker, 739 *, 740 state: StateProviderBase | None = None, 741 stop_event: threading.Event | None = None, 742 ) -> Generator[AirbyteMessage, None, None]: 743 """Call read on the connector. 744 745 This involves the following steps: 746 * Write the config to a temporary file 747 * execute the connector with read --config <config_file> --catalog <catalog_file> 748 * Listen to the messages and return the AirbyteRecordMessages that come along. 749 * Send out telemetry on the performed sync (with information about which source was used and 750 the type of the cache) 751 """ 752 with as_temp_files( 753 [ 754 self._hydrated_config, 755 catalog.model_dump_json(), 756 state.to_state_input_file_text() if state else "[]", 757 ] 758 ) as [ 759 config_file, 760 catalog_file, 761 state_file, 762 ]: 763 message_generator = self._execute( 764 [ 765 "read", 766 "--config", 767 config_file, 768 "--catalog", 769 catalog_file, 770 "--state", 771 state_file, 772 ], 773 progress_tracker=progress_tracker, 774 ) 775 for message in progress_tracker.tally_records_read(message_generator): 776 if stop_event and stop_event.is_set(): 777 progress_tracker._log_sync_cancel() # noqa: SLF001 778 time.sleep(0.1) 779 return 780 781 yield message 782 783 progress_tracker.log_read_complete() 784 785 def _peek_airbyte_message( 786 self, 787 message: AirbyteMessage, 788 *, 789 raise_on_error: bool = True, 790 ) -> None: 791 """Process an Airbyte message. 792 793 This method handles reading Airbyte messages and taking action, if needed, based on the 794 message type. For instance, log messages are logged, records are tallied, and errors are 795 raised as exceptions if `raise_on_error` is True. 796 797 Raises: 798 AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted. 799 """ 800 super()._peek_airbyte_message(message, raise_on_error=raise_on_error) 801 802 def _log_incremental_streams( 803 self, 804 *, 805 incremental_streams: set[str] | None = None, 806 ) -> None: 807 """Log the streams which are using incremental sync mode.""" 808 log_message = ( 809 "The following streams are currently using incremental sync:\n" 810 f"{incremental_streams}\n" 811 "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method." 812 ) 813 print(log_message, file=sys.stderr) 814 815 def read( 816 self, 817 cache: CacheBase | None = None, 818 *, 819 streams: str | list[str] | None = None, 820 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 821 force_full_refresh: bool = False, 822 skip_validation: bool = False, 823 ) -> ReadResult: 824 """Read from the connector and write to the cache. 825 826 Args: 827 cache: The cache to write to. If not set, a default cache will be used. 828 streams: Optional if already set. A list of stream names to select for reading. If set 829 to "*", all streams will be selected. 830 write_strategy: The strategy to use when writing to the cache. If a string, it must be 831 one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one 832 of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or 833 WriteStrategy.AUTO. 834 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 835 streams will be read in incremental mode if supported by the connector. This option 836 must be True when using the "replace" strategy. 837 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 838 running the connector. This can be helpful in debugging, when you want to send 839 configurations to the connector that otherwise might be rejected by JSON Schema 840 validation rules. 841 """ 842 cache = cache or get_default_cache() 843 progress_tracker = ProgressTracker( 844 source=self, 845 cache=cache, 846 destination=None, 847 expected_streams=None, # Will be set later 848 ) 849 850 # Set up state provider if not in full refresh mode 851 if force_full_refresh: 852 state_provider: StateProviderBase | None = None 853 else: 854 state_provider = cache.get_state_provider( 855 source_name=self._name, 856 ) 857 state_writer = cache.get_state_writer(source_name=self._name) 858 859 if streams: 860 self.select_streams(streams) 861 862 if not self._selected_stream_names: 863 raise exc.PyAirbyteNoStreamsSelectedError( 864 connector_name=self.name, 865 available_streams=self.get_available_streams(), 866 ) 867 868 try: 869 result = self._read_to_cache( 870 cache=cache, 871 catalog_provider=CatalogProvider(self.configured_catalog), 872 stream_names=self._selected_stream_names, 873 state_provider=state_provider, 874 state_writer=state_writer, 875 write_strategy=write_strategy, 876 force_full_refresh=force_full_refresh, 877 skip_validation=skip_validation, 878 progress_tracker=progress_tracker, 879 ) 880 except exc.PyAirbyteInternalError as ex: 881 progress_tracker.log_failure(exception=ex) 882 raise exc.AirbyteConnectorFailedError( 883 connector_name=self.name, 884 log_text=self._last_log_messages, 885 ) from ex 886 except Exception as ex: 887 progress_tracker.log_failure(exception=ex) 888 raise 889 890 progress_tracker.log_success() 891 return result 892 893 def _read_to_cache( # noqa: PLR0913 # Too many arguments 894 self, 895 cache: CacheBase, 896 *, 897 catalog_provider: CatalogProvider, 898 stream_names: list[str], 899 state_provider: StateProviderBase | None, 900 state_writer: StateWriterBase | None, 901 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 902 force_full_refresh: bool = False, 903 skip_validation: bool = False, 904 progress_tracker: ProgressTracker, 905 ) -> ReadResult: 906 """Internal read method.""" 907 if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: 908 warnings.warn( 909 message=( 910 "Using `REPLACE` strategy without also setting `full_refresh_mode=True` " 911 "could result in data loss. " 912 "To silence this warning, use the following: " 913 'warnings.filterwarnings("ignore", ' 914 'category="airbyte.warnings.PyAirbyteDataLossWarning")`' 915 ), 916 category=exc.PyAirbyteDataLossWarning, 917 stacklevel=1, 918 ) 919 if isinstance(write_strategy, str): 920 try: 921 write_strategy = WriteStrategy(write_strategy) 922 except ValueError: 923 raise exc.PyAirbyteInputError( 924 message="Invalid strategy", 925 context={ 926 "write_strategy": write_strategy, 927 "available_strategies": [s.value for s in WriteStrategy], 928 }, 929 ) from None 930 931 # Run optional validation step 932 if not skip_validation: 933 self.validate_config() 934 935 # Log incremental stream if incremental streams are known 936 if state_provider and state_provider.known_stream_names: 937 # Retrieve set of the known streams support which support incremental sync 938 incremental_streams = ( 939 set(self._get_incremental_stream_names()) 940 & state_provider.known_stream_names 941 & set(self.get_selected_streams()) 942 ) 943 if incremental_streams: 944 self._log_incremental_streams(incremental_streams=incremental_streams) 945 946 airbyte_message_iterator = AirbyteMessageIterator( 947 self._read_with_catalog( 948 catalog=catalog_provider.configured_catalog, 949 state=state_provider, 950 progress_tracker=progress_tracker, 951 ) 952 ) 953 cache._write_airbyte_message_stream( # noqa: SLF001 # Non-public API 954 stdin=airbyte_message_iterator, 955 catalog_provider=catalog_provider, 956 write_strategy=write_strategy, 957 state_writer=state_writer, 958 progress_tracker=progress_tracker, 959 ) 960 961 # Flush the WAL, if applicable 962 cache.processor._do_checkpoint() # noqa: SLF001 # Non-public API 963 964 return ReadResult( 965 source_name=self.name, 966 progress_tracker=progress_tracker, 967 processed_streams=stream_names, 968 cache=cache, 969 ) 970 971 972__all__ = [ 973 "Source", 974]
68class Source(ConnectorBase): # noqa: PLR0904 69 """A class representing a source that can be called.""" 70 71 connector_type = "source" 72 73 def __init__( 74 self, 75 executor: Executor, 76 name: str, 77 config: dict[str, Any] | None = None, 78 *, 79 config_change_callback: ConfigChangeCallback | None = None, 80 streams: str | list[str] | None = None, 81 validate: bool = False, 82 cursor_key_overrides: dict[str, str] | None = None, 83 primary_key_overrides: dict[str, str | list[str]] | None = None, 84 ) -> None: 85 """Initialize the source. 86 87 If config is provided, it will be validated against the spec if validate is True. 88 """ 89 self._to_be_selected_streams: list[str] | str = [] 90 """Used to hold selection criteria before catalog is known.""" 91 92 super().__init__( 93 executor=executor, 94 name=name, 95 config=config, 96 config_change_callback=config_change_callback, 97 validate=validate, 98 ) 99 self._config_dict: dict[str, Any] | None = None 100 self._last_log_messages: list[str] = [] 101 self._discovered_catalog: AirbyteCatalog | None = None 102 self._selected_stream_names: list[str] = [] 103 104 self._cursor_key_overrides: dict[str, str] = {} 105 """A mapping of lower-cased stream names to cursor key overrides.""" 106 107 self._primary_key_overrides: dict[str, list[str]] = {} 108 """A mapping of lower-cased stream names to primary key overrides.""" 109 110 if config is not None: 111 self.set_config(config, validate=validate) 112 if streams is not None: 113 self.select_streams(streams) 114 if cursor_key_overrides is not None: 115 self.set_cursor_keys(**cursor_key_overrides) 116 if primary_key_overrides is not None: 117 self.set_primary_keys(**primary_key_overrides) 118 119 def set_streams(self, streams: list[str]) -> None: 120 """Deprecated. See select_streams().""" 121 warnings.warn( 122 "The 'set_streams' method is deprecated and will be removed in a future version. " 123 "Please use the 'select_streams' method instead.", 124 DeprecationWarning, 125 stacklevel=2, 126 ) 127 self.select_streams(streams) 128 129 def set_cursor_key( 130 self, 131 stream_name: str, 132 cursor_key: str, 133 ) -> None: 134 """Set the cursor for a single stream. 135 136 Note: 137 - This does not unset previously set cursors. 138 - The cursor key must be a single field name. 139 - Not all streams support custom cursors. If a stream does not support custom cursors, 140 the override may be ignored. 141 - Stream names are case insensitive, while field names are case sensitive. 142 - Stream names are not validated by PyAirbyte. If the stream name 143 does not exist in the catalog, the override may be ignored. 144 """ 145 self._cursor_key_overrides[stream_name.lower()] = cursor_key 146 147 def set_cursor_keys( 148 self, 149 **kwargs: str, 150 ) -> None: 151 """Override the cursor key for one or more streams. 152 153 Usage: 154 source.set_cursor_keys( 155 stream1="cursor1", 156 stream2="cursor2", 157 ) 158 159 Note: 160 - This does not unset previously set cursors. 161 - The cursor key must be a single field name. 162 - Not all streams support custom cursors. If a stream does not support custom cursors, 163 the override may be ignored. 164 - Stream names are case insensitive, while field names are case sensitive. 165 - Stream names are not validated by PyAirbyte. If the stream name 166 does not exist in the catalog, the override may be ignored. 167 """ 168 self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()}) 169 170 def set_primary_key( 171 self, 172 stream_name: str, 173 primary_key: str | list[str], 174 ) -> None: 175 """Set the primary key for a single stream. 176 177 Note: 178 - This does not unset previously set primary keys. 179 - The primary key must be a single field name or a list of field names. 180 - Not all streams support overriding primary keys. If a stream does not support overriding 181 primary keys, the override may be ignored. 182 - Stream names are case insensitive, while field names are case sensitive. 183 - Stream names are not validated by PyAirbyte. If the stream name 184 does not exist in the catalog, the override may be ignored. 185 """ 186 self._primary_key_overrides[stream_name.lower()] = ( 187 primary_key if isinstance(primary_key, list) else [primary_key] 188 ) 189 190 def set_primary_keys( 191 self, 192 **kwargs: str | list[str], 193 ) -> None: 194 """Override the primary keys for one or more streams. 195 196 This does not unset previously set primary keys. 197 198 Usage: 199 source.set_primary_keys( 200 stream1="pk1", 201 stream2=["pk1", "pk2"], 202 ) 203 204 Note: 205 - This does not unset previously set primary keys. 206 - The primary key must be a single field name or a list of field names. 207 - Not all streams support overriding primary keys. If a stream does not support overriding 208 primary keys, the override may be ignored. 209 - Stream names are case insensitive, while field names are case sensitive. 210 - Stream names are not validated by PyAirbyte. If the stream name 211 does not exist in the catalog, the override may be ignored. 212 """ 213 self._primary_key_overrides.update( 214 {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()} 215 ) 216 217 def _log_warning_preselected_stream(self, streams: str | list[str]) -> None: 218 """Logs a warning message indicating stream selection which are not selected yet.""" 219 if streams == "*": 220 print( 221 "Warning: Config is not set yet. All streams will be selected after config is set.", 222 file=sys.stderr, 223 ) 224 else: 225 print( 226 "Warning: Config is not set yet. " 227 f"Streams to be selected after config is set: {streams}", 228 file=sys.stderr, 229 ) 230 231 def select_all_streams(self) -> None: 232 """Select all streams. 233 234 This is a more streamlined equivalent to: 235 > source.select_streams(source.get_available_streams()). 236 """ 237 if self._config_dict is None: 238 self._to_be_selected_streams = "*" 239 self._log_warning_preselected_stream(self._to_be_selected_streams) 240 return 241 242 self._selected_stream_names = self.get_available_streams() 243 244 def select_streams(self, streams: str | list[str]) -> None: 245 """Select the stream names that should be read from the connector. 246 247 Args: 248 streams: A list of stream names to select. If set to "*", all streams will be selected. 249 250 Currently, if this is not set, all streams will be read. 251 """ 252 if self._config_dict is None: 253 self._to_be_selected_streams = streams 254 self._log_warning_preselected_stream(streams) 255 return 256 257 if streams == "*": 258 self.select_all_streams() 259 return 260 261 if isinstance(streams, str): 262 # If a single stream is provided, convert it to a one-item list 263 streams = [streams] 264 265 available_streams = self.get_available_streams() 266 for stream in streams: 267 if stream not in available_streams: 268 raise exc.AirbyteStreamNotFoundError( 269 stream_name=stream, 270 connector_name=self.name, 271 available_streams=available_streams, 272 ) 273 self._selected_stream_names = streams 274 275 def get_selected_streams(self) -> list[str]: 276 """Get the selected streams. 277 278 If no streams are selected, return an empty list. 279 """ 280 return self._selected_stream_names 281 282 def set_config( 283 self, 284 config: dict[str, Any], 285 *, 286 validate: bool = True, 287 ) -> None: 288 """Set the config for the connector. 289 290 If validate is True, raise an exception if the config fails validation. 291 292 If validate is False, validation will be deferred until check() or validate_config() 293 is called. 294 """ 295 if validate: 296 self.validate_config(config) 297 298 self._config_dict = config 299 300 if self._to_be_selected_streams: 301 self.select_streams(self._to_be_selected_streams) 302 self._to_be_selected_streams = [] 303 304 def _discover(self) -> AirbyteCatalog: 305 """Call discover on the connector. 306 307 This involves the following steps: 308 - Write the config to a temporary file 309 - execute the connector with discover --config <config_file> 310 - Listen to the messages and return the first AirbyteCatalog that comes along. 311 - Make sure the subprocess is killed when the function returns. 312 """ 313 with as_temp_files([self._hydrated_config]) as [config_file]: 314 for msg in self._execute(["discover", "--config", config_file]): 315 if msg.type == Type.CATALOG and msg.catalog: 316 return msg.catalog 317 raise exc.AirbyteConnectorMissingCatalogError( 318 connector_name=self.name, 319 log_text=self._last_log_messages, 320 ) 321 322 def get_available_streams(self) -> list[str]: 323 """Get the available streams from the spec.""" 324 return [s.name for s in self.discovered_catalog.streams] 325 326 def _get_incremental_stream_names(self) -> list[str]: 327 """Get the name of streams that support incremental sync.""" 328 return [ 329 stream.name 330 for stream in self.discovered_catalog.streams 331 if SyncMode.incremental in stream.supported_sync_modes 332 ] 333 334 @override 335 def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: 336 """Call spec on the connector. 337 338 This involves the following steps: 339 * execute the connector with spec 340 * Listen to the messages and return the first AirbyteCatalog that comes along. 341 * Make sure the subprocess is killed when the function returns. 342 """ 343 if force_refresh or self._spec is None: 344 for msg in self._execute(["spec"]): 345 if msg.type == Type.SPEC and msg.spec: 346 self._spec = msg.spec 347 break 348 349 if self._spec: 350 return self._spec 351 352 raise exc.AirbyteConnectorMissingSpecError( 353 connector_name=self.name, 354 log_text=self._last_log_messages, 355 ) 356 357 @property 358 def config_spec(self) -> dict[str, Any]: 359 """Generate a configuration spec for this connector, as a JSON Schema definition. 360 361 This function generates a JSON Schema dictionary with configuration specs for the 362 current connector, as a dictionary. 363 364 Returns: 365 dict: The JSON Schema configuration spec as a dictionary. 366 """ 367 return self._get_spec(force_refresh=True).connectionSpecification 368 369 @property 370 def _yaml_spec(self) -> str: 371 """Get the spec as a yaml string. 372 373 For now, the primary use case is for writing and debugging a valid config for a source. 374 375 This is private for now because we probably want better polish before exposing this 376 as a stable interface. This will also get easier when we have docs links with this info 377 for each connector. 378 """ 379 spec_obj: ConnectorSpecification = self._get_spec() 380 spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True) 381 # convert to a yaml string 382 return yaml.dump(spec_dict) 383 384 @property 385 def docs_url(self) -> str: 386 """Get the URL to the connector's documentation.""" 387 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 388 "source-", "" 389 ) 390 391 @property 392 def discovered_catalog(self) -> AirbyteCatalog: 393 """Get the raw catalog for the given streams. 394 395 If the catalog is not yet known, we call discover to get it. 396 """ 397 if self._discovered_catalog is None: 398 self._discovered_catalog = self._discover() 399 400 return self._discovered_catalog 401 402 @property 403 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 404 """Get the configured catalog for the given streams. 405 406 If the raw catalog is not yet known, we call discover to get it. 407 408 If no specific streams are selected, we return a catalog that syncs all available streams. 409 410 TODO: We should consider disabling by default the streams that the connector would 411 disable by default. (For instance, streams that require a premium license are sometimes 412 disabled by default within the connector.) 413 """ 414 # Ensure discovered catalog is cached before we start 415 _ = self.discovered_catalog 416 417 # Filter for selected streams if set, otherwise use all available streams: 418 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 419 return self.get_configured_catalog(streams=streams_filter) 420 421 def get_configured_catalog( 422 self, 423 streams: Literal["*"] | list[str] | None = None, 424 ) -> ConfiguredAirbyteCatalog: 425 """Get a configured catalog for the given streams. 426 427 If no streams are provided, the selected streams will be used. If no streams are selected, 428 all available streams will be used. 429 430 If '*' is provided, all available streams will be used. 431 """ 432 selected_streams: list[str] = [] 433 if streams is None: 434 selected_streams = self._selected_stream_names or self.get_available_streams() 435 elif streams == "*": 436 selected_streams = self.get_available_streams() 437 elif isinstance(streams, list): 438 selected_streams = streams 439 else: 440 raise exc.PyAirbyteInputError( 441 message="Invalid streams argument.", 442 input_value=streams, 443 ) 444 445 return ConfiguredAirbyteCatalog( 446 streams=[ 447 ConfiguredAirbyteStream( 448 stream=stream, 449 destination_sync_mode=DestinationSyncMode.overwrite, 450 sync_mode=SyncMode.incremental, 451 primary_key=( 452 [self._primary_key_overrides[stream.name.lower()]] 453 if stream.name.lower() in self._primary_key_overrides 454 else stream.source_defined_primary_key 455 ), 456 cursor_field=( 457 [self._cursor_key_overrides[stream.name.lower()]] 458 if stream.name.lower() in self._cursor_key_overrides 459 else stream.default_cursor_field 460 ), 461 # These are unused in the current implementation: 462 generation_id=None, 463 minimum_generation_id=None, 464 sync_id=None, 465 ) 466 for stream in self.discovered_catalog.streams 467 if stream.name in selected_streams 468 ], 469 ) 470 471 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 472 """Return the JSON Schema spec for the specified stream name.""" 473 catalog: AirbyteCatalog = self.discovered_catalog 474 found: list[AirbyteStream] = [ 475 stream for stream in catalog.streams if stream.name == stream_name 476 ] 477 478 if len(found) == 0: 479 raise exc.PyAirbyteInputError( 480 message="Stream name does not exist in catalog.", 481 input_value=stream_name, 482 ) 483 484 if len(found) > 1: 485 raise exc.PyAirbyteInternalError( 486 message="Duplicate streams found with the same name.", 487 context={ 488 "found_streams": found, 489 }, 490 ) 491 492 return found[0].json_schema 493 494 def get_records( 495 self, 496 stream: str, 497 *, 498 limit: int | None = None, 499 stop_event: threading.Event | None = None, 500 normalize_field_names: bool = False, 501 prune_undeclared_fields: bool = True, 502 ) -> LazyDataset: 503 """Read a stream from the connector. 504 505 Args: 506 stream: The name of the stream to read. 507 limit: The maximum number of records to read. If None, all records will be read. 508 stop_event: If set, the event can be triggered by the caller to stop reading records 509 and terminate the process. 510 normalize_field_names: When `True`, field names will be normalized to lower case, with 511 special characters removed. This matches the behavior of PyAirbyte caches and most 512 Airbyte destinations. 513 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 514 which generally matches the behavior of PyAirbyte caches and most Airbyte 515 destinations, specifically when you expect the catalog may be stale. You can disable 516 this to keep all fields in the records. 517 518 This involves the following steps: 519 * Call discover to get the catalog 520 * Generate a configured catalog that syncs the given stream in full_refresh mode 521 * Write the configured catalog and the config to a temporary file 522 * execute the connector with read --config <config_file> --catalog <catalog_file> 523 * Listen to the messages and return the first AirbyteRecordMessages that come along. 524 * Make sure the subprocess is killed when the function returns. 525 """ 526 stop_event = stop_event or threading.Event() 527 configured_catalog = self.get_configured_catalog(streams=[stream]) 528 if len(configured_catalog.streams) == 0: 529 raise exc.PyAirbyteInputError( 530 message="Requested stream does not exist.", 531 context={ 532 "stream": stream, 533 "available_streams": self.get_available_streams(), 534 "connector_name": self.name, 535 }, 536 ) from KeyError(stream) 537 538 configured_stream = configured_catalog.streams[0] 539 540 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 541 yield from records 542 543 stream_record_handler = StreamRecordHandler( 544 json_schema=self.get_stream_json_schema(stream), 545 prune_extra_fields=prune_undeclared_fields, 546 normalize_keys=normalize_field_names, 547 ) 548 549 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 550 progress_tracker = ProgressTracker( 551 ProgressStyle.PLAIN, 552 source=self, 553 cache=None, 554 destination=None, 555 expected_streams=[stream], 556 ) 557 558 iterator: Iterator[dict[str, Any]] = ( 559 StreamRecord.from_record_message( 560 record_message=record.record, 561 stream_record_handler=stream_record_handler, 562 ) 563 for record in self._read_with_catalog( 564 catalog=configured_catalog, 565 progress_tracker=progress_tracker, 566 stop_event=stop_event, 567 ) 568 if record.record 569 ) 570 if limit is not None: 571 # Stop the iterator after the limit is reached 572 iterator = islice(iterator, limit) 573 574 return LazyDataset( 575 iterator, 576 stream_metadata=configured_stream, 577 stop_event=stop_event, 578 progress_tracker=progress_tracker, 579 ) 580 581 def get_documents( 582 self, 583 stream: str, 584 title_property: str | None = None, 585 content_properties: list[str] | None = None, 586 metadata_properties: list[str] | None = None, 587 *, 588 render_metadata: bool = False, 589 ) -> Iterable[Document]: 590 """Read a stream from the connector and return the records as documents. 591 592 If metadata_properties is not set, all properties that are not content will be added to 593 the metadata. 594 595 If render_metadata is True, metadata will be rendered in the document, as well as the 596 the main content. 597 """ 598 return self.get_records(stream).to_documents( 599 title_property=title_property, 600 content_properties=content_properties, 601 metadata_properties=metadata_properties, 602 render_metadata=render_metadata, 603 ) 604 605 def get_samples( 606 self, 607 streams: list[str] | Literal["*"] | None = None, 608 *, 609 limit: int = 5, 610 on_error: Literal["raise", "ignore", "log"] = "raise", 611 ) -> dict[str, InMemoryDataset | None]: 612 """Get a sample of records from the given streams.""" 613 if streams == "*": 614 streams = self.get_available_streams() 615 elif streams is None: 616 streams = self.get_selected_streams() 617 618 results: dict[str, InMemoryDataset | None] = {} 619 for stream in streams: 620 stop_event = threading.Event() 621 try: 622 results[stream] = self.get_records( 623 stream, 624 limit=limit, 625 stop_event=stop_event, 626 ).fetch_all() 627 stop_event.set() 628 except Exception as ex: 629 results[stream] = None 630 if on_error == "ignore": 631 continue 632 633 if on_error == "raise": 634 raise ex from None 635 636 if on_error == "log": 637 print(f"Error fetching sample for stream '{stream}': {ex}") 638 639 return results 640 641 def print_samples( 642 self, 643 streams: list[str] | Literal["*"] | None = None, 644 *, 645 limit: int = 5, 646 on_error: Literal["raise", "ignore", "log"] = "log", 647 ) -> None: 648 """Print a sample of records from the given streams.""" 649 internal_cols: list[str] = [ 650 AB_EXTRACTED_AT_COLUMN, 651 AB_META_COLUMN, 652 AB_RAW_ID_COLUMN, 653 ] 654 col_limit = 10 655 if streams == "*": 656 streams = self.get_available_streams() 657 elif streams is None: 658 streams = self.get_selected_streams() 659 660 console = Console() 661 662 console.print( 663 Markdown( 664 f"# Sample Records from `{self.name}` ({len(streams)} selected streams)", 665 justify="left", 666 ) 667 ) 668 669 for stream in streams: 670 console.print(Markdown(f"## `{stream}` Stream Sample", justify="left")) 671 samples = self.get_samples( 672 streams=[stream], 673 limit=limit, 674 on_error=on_error, 675 ) 676 dataset = samples[stream] 677 678 table = Table( 679 show_header=True, 680 show_lines=True, 681 ) 682 if dataset is None: 683 console.print( 684 Markdown("**⚠️ `Error fetching sample records.` ⚠️**"), 685 ) 686 continue 687 688 if len(dataset.column_names) > col_limit: 689 # We'll pivot the columns so each column is its own row 690 table.add_column("Column Name") 691 for _ in range(len(dataset)): 692 table.add_column(overflow="fold") 693 for col in dataset.column_names: 694 table.add_row( 695 Markdown(f"**`{col}`**"), 696 *[escape(str(record[col])) for record in dataset], 697 ) 698 else: 699 for col in dataset.column_names: 700 table.add_column( 701 Markdown(f"**`{col}`**"), 702 overflow="fold", 703 ) 704 705 for record in dataset: 706 table.add_row( 707 *[ 708 escape(str(val)) 709 for key, val in record.items() 710 # Exclude internal Airbyte columns. 711 if key not in internal_cols 712 ] 713 ) 714 715 console.print(table) 716 717 console.print(Markdown("--------------")) 718 719 def _get_airbyte_message_iterator( 720 self, 721 *, 722 streams: Literal["*"] | list[str] | None = None, 723 state_provider: StateProviderBase | None = None, 724 progress_tracker: ProgressTracker, 725 force_full_refresh: bool = False, 726 ) -> AirbyteMessageIterator: 727 """Get an AirbyteMessageIterator for this source.""" 728 return AirbyteMessageIterator( 729 self._read_with_catalog( 730 catalog=self.get_configured_catalog(streams=streams), 731 state=state_provider if not force_full_refresh else None, 732 progress_tracker=progress_tracker, 733 ) 734 ) 735 736 def _read_with_catalog( 737 self, 738 catalog: ConfiguredAirbyteCatalog, 739 progress_tracker: ProgressTracker, 740 *, 741 state: StateProviderBase | None = None, 742 stop_event: threading.Event | None = None, 743 ) -> Generator[AirbyteMessage, None, None]: 744 """Call read on the connector. 745 746 This involves the following steps: 747 * Write the config to a temporary file 748 * execute the connector with read --config <config_file> --catalog <catalog_file> 749 * Listen to the messages and return the AirbyteRecordMessages that come along. 750 * Send out telemetry on the performed sync (with information about which source was used and 751 the type of the cache) 752 """ 753 with as_temp_files( 754 [ 755 self._hydrated_config, 756 catalog.model_dump_json(), 757 state.to_state_input_file_text() if state else "[]", 758 ] 759 ) as [ 760 config_file, 761 catalog_file, 762 state_file, 763 ]: 764 message_generator = self._execute( 765 [ 766 "read", 767 "--config", 768 config_file, 769 "--catalog", 770 catalog_file, 771 "--state", 772 state_file, 773 ], 774 progress_tracker=progress_tracker, 775 ) 776 for message in progress_tracker.tally_records_read(message_generator): 777 if stop_event and stop_event.is_set(): 778 progress_tracker._log_sync_cancel() # noqa: SLF001 779 time.sleep(0.1) 780 return 781 782 yield message 783 784 progress_tracker.log_read_complete() 785 786 def _peek_airbyte_message( 787 self, 788 message: AirbyteMessage, 789 *, 790 raise_on_error: bool = True, 791 ) -> None: 792 """Process an Airbyte message. 793 794 This method handles reading Airbyte messages and taking action, if needed, based on the 795 message type. For instance, log messages are logged, records are tallied, and errors are 796 raised as exceptions if `raise_on_error` is True. 797 798 Raises: 799 AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted. 800 """ 801 super()._peek_airbyte_message(message, raise_on_error=raise_on_error) 802 803 def _log_incremental_streams( 804 self, 805 *, 806 incremental_streams: set[str] | None = None, 807 ) -> None: 808 """Log the streams which are using incremental sync mode.""" 809 log_message = ( 810 "The following streams are currently using incremental sync:\n" 811 f"{incremental_streams}\n" 812 "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method." 813 ) 814 print(log_message, file=sys.stderr) 815 816 def read( 817 self, 818 cache: CacheBase | None = None, 819 *, 820 streams: str | list[str] | None = None, 821 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 822 force_full_refresh: bool = False, 823 skip_validation: bool = False, 824 ) -> ReadResult: 825 """Read from the connector and write to the cache. 826 827 Args: 828 cache: The cache to write to. If not set, a default cache will be used. 829 streams: Optional if already set. A list of stream names to select for reading. If set 830 to "*", all streams will be selected. 831 write_strategy: The strategy to use when writing to the cache. If a string, it must be 832 one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one 833 of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or 834 WriteStrategy.AUTO. 835 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 836 streams will be read in incremental mode if supported by the connector. This option 837 must be True when using the "replace" strategy. 838 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 839 running the connector. This can be helpful in debugging, when you want to send 840 configurations to the connector that otherwise might be rejected by JSON Schema 841 validation rules. 842 """ 843 cache = cache or get_default_cache() 844 progress_tracker = ProgressTracker( 845 source=self, 846 cache=cache, 847 destination=None, 848 expected_streams=None, # Will be set later 849 ) 850 851 # Set up state provider if not in full refresh mode 852 if force_full_refresh: 853 state_provider: StateProviderBase | None = None 854 else: 855 state_provider = cache.get_state_provider( 856 source_name=self._name, 857 ) 858 state_writer = cache.get_state_writer(source_name=self._name) 859 860 if streams: 861 self.select_streams(streams) 862 863 if not self._selected_stream_names: 864 raise exc.PyAirbyteNoStreamsSelectedError( 865 connector_name=self.name, 866 available_streams=self.get_available_streams(), 867 ) 868 869 try: 870 result = self._read_to_cache( 871 cache=cache, 872 catalog_provider=CatalogProvider(self.configured_catalog), 873 stream_names=self._selected_stream_names, 874 state_provider=state_provider, 875 state_writer=state_writer, 876 write_strategy=write_strategy, 877 force_full_refresh=force_full_refresh, 878 skip_validation=skip_validation, 879 progress_tracker=progress_tracker, 880 ) 881 except exc.PyAirbyteInternalError as ex: 882 progress_tracker.log_failure(exception=ex) 883 raise exc.AirbyteConnectorFailedError( 884 connector_name=self.name, 885 log_text=self._last_log_messages, 886 ) from ex 887 except Exception as ex: 888 progress_tracker.log_failure(exception=ex) 889 raise 890 891 progress_tracker.log_success() 892 return result 893 894 def _read_to_cache( # noqa: PLR0913 # Too many arguments 895 self, 896 cache: CacheBase, 897 *, 898 catalog_provider: CatalogProvider, 899 stream_names: list[str], 900 state_provider: StateProviderBase | None, 901 state_writer: StateWriterBase | None, 902 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 903 force_full_refresh: bool = False, 904 skip_validation: bool = False, 905 progress_tracker: ProgressTracker, 906 ) -> ReadResult: 907 """Internal read method.""" 908 if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: 909 warnings.warn( 910 message=( 911 "Using `REPLACE` strategy without also setting `full_refresh_mode=True` " 912 "could result in data loss. " 913 "To silence this warning, use the following: " 914 'warnings.filterwarnings("ignore", ' 915 'category="airbyte.warnings.PyAirbyteDataLossWarning")`' 916 ), 917 category=exc.PyAirbyteDataLossWarning, 918 stacklevel=1, 919 ) 920 if isinstance(write_strategy, str): 921 try: 922 write_strategy = WriteStrategy(write_strategy) 923 except ValueError: 924 raise exc.PyAirbyteInputError( 925 message="Invalid strategy", 926 context={ 927 "write_strategy": write_strategy, 928 "available_strategies": [s.value for s in WriteStrategy], 929 }, 930 ) from None 931 932 # Run optional validation step 933 if not skip_validation: 934 self.validate_config() 935 936 # Log incremental stream if incremental streams are known 937 if state_provider and state_provider.known_stream_names: 938 # Retrieve set of the known streams support which support incremental sync 939 incremental_streams = ( 940 set(self._get_incremental_stream_names()) 941 & state_provider.known_stream_names 942 & set(self.get_selected_streams()) 943 ) 944 if incremental_streams: 945 self._log_incremental_streams(incremental_streams=incremental_streams) 946 947 airbyte_message_iterator = AirbyteMessageIterator( 948 self._read_with_catalog( 949 catalog=catalog_provider.configured_catalog, 950 state=state_provider, 951 progress_tracker=progress_tracker, 952 ) 953 ) 954 cache._write_airbyte_message_stream( # noqa: SLF001 # Non-public API 955 stdin=airbyte_message_iterator, 956 catalog_provider=catalog_provider, 957 write_strategy=write_strategy, 958 state_writer=state_writer, 959 progress_tracker=progress_tracker, 960 ) 961 962 # Flush the WAL, if applicable 963 cache.processor._do_checkpoint() # noqa: SLF001 # Non-public API 964 965 return ReadResult( 966 source_name=self.name, 967 progress_tracker=progress_tracker, 968 processed_streams=stream_names, 969 cache=cache, 970 )
A class representing a source that can be called.
73 def __init__( 74 self, 75 executor: Executor, 76 name: str, 77 config: dict[str, Any] | None = None, 78 *, 79 config_change_callback: ConfigChangeCallback | None = None, 80 streams: str | list[str] | None = None, 81 validate: bool = False, 82 cursor_key_overrides: dict[str, str] | None = None, 83 primary_key_overrides: dict[str, str | list[str]] | None = None, 84 ) -> None: 85 """Initialize the source. 86 87 If config is provided, it will be validated against the spec if validate is True. 88 """ 89 self._to_be_selected_streams: list[str] | str = [] 90 """Used to hold selection criteria before catalog is known.""" 91 92 super().__init__( 93 executor=executor, 94 name=name, 95 config=config, 96 config_change_callback=config_change_callback, 97 validate=validate, 98 ) 99 self._config_dict: dict[str, Any] | None = None 100 self._last_log_messages: list[str] = [] 101 self._discovered_catalog: AirbyteCatalog | None = None 102 self._selected_stream_names: list[str] = [] 103 104 self._cursor_key_overrides: dict[str, str] = {} 105 """A mapping of lower-cased stream names to cursor key overrides.""" 106 107 self._primary_key_overrides: dict[str, list[str]] = {} 108 """A mapping of lower-cased stream names to primary key overrides.""" 109 110 if config is not None: 111 self.set_config(config, validate=validate) 112 if streams is not None: 113 self.select_streams(streams) 114 if cursor_key_overrides is not None: 115 self.set_cursor_keys(**cursor_key_overrides) 116 if primary_key_overrides is not None: 117 self.set_primary_keys(**primary_key_overrides)
Initialize the source.
If config is provided, it will be validated against the spec if validate is True.
119 def set_streams(self, streams: list[str]) -> None: 120 """Deprecated. See select_streams().""" 121 warnings.warn( 122 "The 'set_streams' method is deprecated and will be removed in a future version. " 123 "Please use the 'select_streams' method instead.", 124 DeprecationWarning, 125 stacklevel=2, 126 ) 127 self.select_streams(streams)
Deprecated. See select_streams().
129 def set_cursor_key( 130 self, 131 stream_name: str, 132 cursor_key: str, 133 ) -> None: 134 """Set the cursor for a single stream. 135 136 Note: 137 - This does not unset previously set cursors. 138 - The cursor key must be a single field name. 139 - Not all streams support custom cursors. If a stream does not support custom cursors, 140 the override may be ignored. 141 - Stream names are case insensitive, while field names are case sensitive. 142 - Stream names are not validated by PyAirbyte. If the stream name 143 does not exist in the catalog, the override may be ignored. 144 """ 145 self._cursor_key_overrides[stream_name.lower()] = cursor_key
Set the cursor for a single stream.
Note:
- This does not unset previously set cursors.
- The cursor key must be a single field name.
- Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
147 def set_cursor_keys( 148 self, 149 **kwargs: str, 150 ) -> None: 151 """Override the cursor key for one or more streams. 152 153 Usage: 154 source.set_cursor_keys( 155 stream1="cursor1", 156 stream2="cursor2", 157 ) 158 159 Note: 160 - This does not unset previously set cursors. 161 - The cursor key must be a single field name. 162 - Not all streams support custom cursors. If a stream does not support custom cursors, 163 the override may be ignored. 164 - Stream names are case insensitive, while field names are case sensitive. 165 - Stream names are not validated by PyAirbyte. If the stream name 166 does not exist in the catalog, the override may be ignored. 167 """ 168 self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})
Override the cursor key for one or more streams.
Usage:
source.set_cursor_keys( stream1="cursor1", stream2="cursor2", )
Note:
- This does not unset previously set cursors.
- The cursor key must be a single field name.
- Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
170 def set_primary_key( 171 self, 172 stream_name: str, 173 primary_key: str | list[str], 174 ) -> None: 175 """Set the primary key for a single stream. 176 177 Note: 178 - This does not unset previously set primary keys. 179 - The primary key must be a single field name or a list of field names. 180 - Not all streams support overriding primary keys. If a stream does not support overriding 181 primary keys, the override may be ignored. 182 - Stream names are case insensitive, while field names are case sensitive. 183 - Stream names are not validated by PyAirbyte. If the stream name 184 does not exist in the catalog, the override may be ignored. 185 """ 186 self._primary_key_overrides[stream_name.lower()] = ( 187 primary_key if isinstance(primary_key, list) else [primary_key] 188 )
Set the primary key for a single stream.
Note:
- This does not unset previously set primary keys.
- The primary key must be a single field name or a list of field names.
- Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
190 def set_primary_keys( 191 self, 192 **kwargs: str | list[str], 193 ) -> None: 194 """Override the primary keys for one or more streams. 195 196 This does not unset previously set primary keys. 197 198 Usage: 199 source.set_primary_keys( 200 stream1="pk1", 201 stream2=["pk1", "pk2"], 202 ) 203 204 Note: 205 - This does not unset previously set primary keys. 206 - The primary key must be a single field name or a list of field names. 207 - Not all streams support overriding primary keys. If a stream does not support overriding 208 primary keys, the override may be ignored. 209 - Stream names are case insensitive, while field names are case sensitive. 210 - Stream names are not validated by PyAirbyte. If the stream name 211 does not exist in the catalog, the override may be ignored. 212 """ 213 self._primary_key_overrides.update( 214 {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()} 215 )
Override the primary keys for one or more streams.
This does not unset previously set primary keys.
Usage:
source.set_primary_keys( stream1="pk1", stream2=["pk1", "pk2"], )
Note:
- This does not unset previously set primary keys.
- The primary key must be a single field name or a list of field names.
- Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
231 def select_all_streams(self) -> None: 232 """Select all streams. 233 234 This is a more streamlined equivalent to: 235 > source.select_streams(source.get_available_streams()). 236 """ 237 if self._config_dict is None: 238 self._to_be_selected_streams = "*" 239 self._log_warning_preselected_stream(self._to_be_selected_streams) 240 return 241 242 self._selected_stream_names = self.get_available_streams()
Select all streams.
This is a more streamlined equivalent to:
source.select_streams(source.get_available_streams()).
244 def select_streams(self, streams: str | list[str]) -> None: 245 """Select the stream names that should be read from the connector. 246 247 Args: 248 streams: A list of stream names to select. If set to "*", all streams will be selected. 249 250 Currently, if this is not set, all streams will be read. 251 """ 252 if self._config_dict is None: 253 self._to_be_selected_streams = streams 254 self._log_warning_preselected_stream(streams) 255 return 256 257 if streams == "*": 258 self.select_all_streams() 259 return 260 261 if isinstance(streams, str): 262 # If a single stream is provided, convert it to a one-item list 263 streams = [streams] 264 265 available_streams = self.get_available_streams() 266 for stream in streams: 267 if stream not in available_streams: 268 raise exc.AirbyteStreamNotFoundError( 269 stream_name=stream, 270 connector_name=self.name, 271 available_streams=available_streams, 272 ) 273 self._selected_stream_names = streams
Select the stream names that should be read from the connector.
Arguments:
- streams: A list of stream names to select. If set to "*", all streams will be selected.
Currently, if this is not set, all streams will be read.
275 def get_selected_streams(self) -> list[str]: 276 """Get the selected streams. 277 278 If no streams are selected, return an empty list. 279 """ 280 return self._selected_stream_names
Get the selected streams.
If no streams are selected, return an empty list.
282 def set_config( 283 self, 284 config: dict[str, Any], 285 *, 286 validate: bool = True, 287 ) -> None: 288 """Set the config for the connector. 289 290 If validate is True, raise an exception if the config fails validation. 291 292 If validate is False, validation will be deferred until check() or validate_config() 293 is called. 294 """ 295 if validate: 296 self.validate_config(config) 297 298 self._config_dict = config 299 300 if self._to_be_selected_streams: 301 self.select_streams(self._to_be_selected_streams) 302 self._to_be_selected_streams = []
Set the config for the connector.
If validate is True, raise an exception if the config fails validation.
If validate is False, validation will be deferred until check() or validate_config() is called.
322 def get_available_streams(self) -> list[str]: 323 """Get the available streams from the spec.""" 324 return [s.name for s in self.discovered_catalog.streams]
Get the available streams from the spec.
357 @property 358 def config_spec(self) -> dict[str, Any]: 359 """Generate a configuration spec for this connector, as a JSON Schema definition. 360 361 This function generates a JSON Schema dictionary with configuration specs for the 362 current connector, as a dictionary. 363 364 Returns: 365 dict: The JSON Schema configuration spec as a dictionary. 366 """ 367 return self._get_spec(force_refresh=True).connectionSpecification
Generate a configuration spec for this connector, as a JSON Schema definition.
This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.
Returns:
dict: The JSON Schema configuration spec as a dictionary.
384 @property 385 def docs_url(self) -> str: 386 """Get the URL to the connector's documentation.""" 387 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 388 "source-", "" 389 )
Get the URL to the connector's documentation.
391 @property 392 def discovered_catalog(self) -> AirbyteCatalog: 393 """Get the raw catalog for the given streams. 394 395 If the catalog is not yet known, we call discover to get it. 396 """ 397 if self._discovered_catalog is None: 398 self._discovered_catalog = self._discover() 399 400 return self._discovered_catalog
Get the raw catalog for the given streams.
If the catalog is not yet known, we call discover to get it.
402 @property 403 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 404 """Get the configured catalog for the given streams. 405 406 If the raw catalog is not yet known, we call discover to get it. 407 408 If no specific streams are selected, we return a catalog that syncs all available streams. 409 410 TODO: We should consider disabling by default the streams that the connector would 411 disable by default. (For instance, streams that require a premium license are sometimes 412 disabled by default within the connector.) 413 """ 414 # Ensure discovered catalog is cached before we start 415 _ = self.discovered_catalog 416 417 # Filter for selected streams if set, otherwise use all available streams: 418 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 419 return self.get_configured_catalog(streams=streams_filter)
Get the configured catalog for the given streams.
If the raw catalog is not yet known, we call discover to get it.
If no specific streams are selected, we return a catalog that syncs all available streams.
TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)
421 def get_configured_catalog( 422 self, 423 streams: Literal["*"] | list[str] | None = None, 424 ) -> ConfiguredAirbyteCatalog: 425 """Get a configured catalog for the given streams. 426 427 If no streams are provided, the selected streams will be used. If no streams are selected, 428 all available streams will be used. 429 430 If '*' is provided, all available streams will be used. 431 """ 432 selected_streams: list[str] = [] 433 if streams is None: 434 selected_streams = self._selected_stream_names or self.get_available_streams() 435 elif streams == "*": 436 selected_streams = self.get_available_streams() 437 elif isinstance(streams, list): 438 selected_streams = streams 439 else: 440 raise exc.PyAirbyteInputError( 441 message="Invalid streams argument.", 442 input_value=streams, 443 ) 444 445 return ConfiguredAirbyteCatalog( 446 streams=[ 447 ConfiguredAirbyteStream( 448 stream=stream, 449 destination_sync_mode=DestinationSyncMode.overwrite, 450 sync_mode=SyncMode.incremental, 451 primary_key=( 452 [self._primary_key_overrides[stream.name.lower()]] 453 if stream.name.lower() in self._primary_key_overrides 454 else stream.source_defined_primary_key 455 ), 456 cursor_field=( 457 [self._cursor_key_overrides[stream.name.lower()]] 458 if stream.name.lower() in self._cursor_key_overrides 459 else stream.default_cursor_field 460 ), 461 # These are unused in the current implementation: 462 generation_id=None, 463 minimum_generation_id=None, 464 sync_id=None, 465 ) 466 for stream in self.discovered_catalog.streams 467 if stream.name in selected_streams 468 ], 469 )
Get a configured catalog for the given streams.
If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.
If '*' is provided, all available streams will be used.
471 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 472 """Return the JSON Schema spec for the specified stream name.""" 473 catalog: AirbyteCatalog = self.discovered_catalog 474 found: list[AirbyteStream] = [ 475 stream for stream in catalog.streams if stream.name == stream_name 476 ] 477 478 if len(found) == 0: 479 raise exc.PyAirbyteInputError( 480 message="Stream name does not exist in catalog.", 481 input_value=stream_name, 482 ) 483 484 if len(found) > 1: 485 raise exc.PyAirbyteInternalError( 486 message="Duplicate streams found with the same name.", 487 context={ 488 "found_streams": found, 489 }, 490 ) 491 492 return found[0].json_schema
Return the JSON Schema spec for the specified stream name.
494 def get_records( 495 self, 496 stream: str, 497 *, 498 limit: int | None = None, 499 stop_event: threading.Event | None = None, 500 normalize_field_names: bool = False, 501 prune_undeclared_fields: bool = True, 502 ) -> LazyDataset: 503 """Read a stream from the connector. 504 505 Args: 506 stream: The name of the stream to read. 507 limit: The maximum number of records to read. If None, all records will be read. 508 stop_event: If set, the event can be triggered by the caller to stop reading records 509 and terminate the process. 510 normalize_field_names: When `True`, field names will be normalized to lower case, with 511 special characters removed. This matches the behavior of PyAirbyte caches and most 512 Airbyte destinations. 513 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 514 which generally matches the behavior of PyAirbyte caches and most Airbyte 515 destinations, specifically when you expect the catalog may be stale. You can disable 516 this to keep all fields in the records. 517 518 This involves the following steps: 519 * Call discover to get the catalog 520 * Generate a configured catalog that syncs the given stream in full_refresh mode 521 * Write the configured catalog and the config to a temporary file 522 * execute the connector with read --config <config_file> --catalog <catalog_file> 523 * Listen to the messages and return the first AirbyteRecordMessages that come along. 524 * Make sure the subprocess is killed when the function returns. 525 """ 526 stop_event = stop_event or threading.Event() 527 configured_catalog = self.get_configured_catalog(streams=[stream]) 528 if len(configured_catalog.streams) == 0: 529 raise exc.PyAirbyteInputError( 530 message="Requested stream does not exist.", 531 context={ 532 "stream": stream, 533 "available_streams": self.get_available_streams(), 534 "connector_name": self.name, 535 }, 536 ) from KeyError(stream) 537 538 configured_stream = configured_catalog.streams[0] 539 540 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 541 yield from records 542 543 stream_record_handler = StreamRecordHandler( 544 json_schema=self.get_stream_json_schema(stream), 545 prune_extra_fields=prune_undeclared_fields, 546 normalize_keys=normalize_field_names, 547 ) 548 549 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 550 progress_tracker = ProgressTracker( 551 ProgressStyle.PLAIN, 552 source=self, 553 cache=None, 554 destination=None, 555 expected_streams=[stream], 556 ) 557 558 iterator: Iterator[dict[str, Any]] = ( 559 StreamRecord.from_record_message( 560 record_message=record.record, 561 stream_record_handler=stream_record_handler, 562 ) 563 for record in self._read_with_catalog( 564 catalog=configured_catalog, 565 progress_tracker=progress_tracker, 566 stop_event=stop_event, 567 ) 568 if record.record 569 ) 570 if limit is not None: 571 # Stop the iterator after the limit is reached 572 iterator = islice(iterator, limit) 573 574 return LazyDataset( 575 iterator, 576 stream_metadata=configured_stream, 577 stop_event=stop_event, 578 progress_tracker=progress_tracker, 579 )
Read a stream from the connector.
Arguments:
- stream: The name of the stream to read.
- limit: The maximum number of records to read. If None, all records will be read.
- stop_event: If set, the event can be triggered by the caller to stop reading records and terminate the process.
- normalize_field_names: When
True
, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations. - prune_undeclared_fields: When
True
, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.
This involves the following steps:
- Call discover to get the catalog
- Generate a configured catalog that syncs the given stream in full_refresh mode
- Write the configured catalog and the config to a temporary file
- execute the connector with read --config
--catalog - Listen to the messages and return the first AirbyteRecordMessages that come along.
- Make sure the subprocess is killed when the function returns.
581 def get_documents( 582 self, 583 stream: str, 584 title_property: str | None = None, 585 content_properties: list[str] | None = None, 586 metadata_properties: list[str] | None = None, 587 *, 588 render_metadata: bool = False, 589 ) -> Iterable[Document]: 590 """Read a stream from the connector and return the records as documents. 591 592 If metadata_properties is not set, all properties that are not content will be added to 593 the metadata. 594 595 If render_metadata is True, metadata will be rendered in the document, as well as the 596 the main content. 597 """ 598 return self.get_records(stream).to_documents( 599 title_property=title_property, 600 content_properties=content_properties, 601 metadata_properties=metadata_properties, 602 render_metadata=render_metadata, 603 )
Read a stream from the connector and return the records as documents.
If metadata_properties is not set, all properties that are not content will be added to the metadata.
If render_metadata is True, metadata will be rendered in the document, as well as the the main content.
605 def get_samples( 606 self, 607 streams: list[str] | Literal["*"] | None = None, 608 *, 609 limit: int = 5, 610 on_error: Literal["raise", "ignore", "log"] = "raise", 611 ) -> dict[str, InMemoryDataset | None]: 612 """Get a sample of records from the given streams.""" 613 if streams == "*": 614 streams = self.get_available_streams() 615 elif streams is None: 616 streams = self.get_selected_streams() 617 618 results: dict[str, InMemoryDataset | None] = {} 619 for stream in streams: 620 stop_event = threading.Event() 621 try: 622 results[stream] = self.get_records( 623 stream, 624 limit=limit, 625 stop_event=stop_event, 626 ).fetch_all() 627 stop_event.set() 628 except Exception as ex: 629 results[stream] = None 630 if on_error == "ignore": 631 continue 632 633 if on_error == "raise": 634 raise ex from None 635 636 if on_error == "log": 637 print(f"Error fetching sample for stream '{stream}': {ex}") 638 639 return results
Get a sample of records from the given streams.
641 def print_samples( 642 self, 643 streams: list[str] | Literal["*"] | None = None, 644 *, 645 limit: int = 5, 646 on_error: Literal["raise", "ignore", "log"] = "log", 647 ) -> None: 648 """Print a sample of records from the given streams.""" 649 internal_cols: list[str] = [ 650 AB_EXTRACTED_AT_COLUMN, 651 AB_META_COLUMN, 652 AB_RAW_ID_COLUMN, 653 ] 654 col_limit = 10 655 if streams == "*": 656 streams = self.get_available_streams() 657 elif streams is None: 658 streams = self.get_selected_streams() 659 660 console = Console() 661 662 console.print( 663 Markdown( 664 f"# Sample Records from `{self.name}` ({len(streams)} selected streams)", 665 justify="left", 666 ) 667 ) 668 669 for stream in streams: 670 console.print(Markdown(f"## `{stream}` Stream Sample", justify="left")) 671 samples = self.get_samples( 672 streams=[stream], 673 limit=limit, 674 on_error=on_error, 675 ) 676 dataset = samples[stream] 677 678 table = Table( 679 show_header=True, 680 show_lines=True, 681 ) 682 if dataset is None: 683 console.print( 684 Markdown("**⚠️ `Error fetching sample records.` ⚠️**"), 685 ) 686 continue 687 688 if len(dataset.column_names) > col_limit: 689 # We'll pivot the columns so each column is its own row 690 table.add_column("Column Name") 691 for _ in range(len(dataset)): 692 table.add_column(overflow="fold") 693 for col in dataset.column_names: 694 table.add_row( 695 Markdown(f"**`{col}`**"), 696 *[escape(str(record[col])) for record in dataset], 697 ) 698 else: 699 for col in dataset.column_names: 700 table.add_column( 701 Markdown(f"**`{col}`**"), 702 overflow="fold", 703 ) 704 705 for record in dataset: 706 table.add_row( 707 *[ 708 escape(str(val)) 709 for key, val in record.items() 710 # Exclude internal Airbyte columns. 711 if key not in internal_cols 712 ] 713 ) 714 715 console.print(table) 716 717 console.print(Markdown("--------------"))
Print a sample of records from the given streams.
816 def read( 817 self, 818 cache: CacheBase | None = None, 819 *, 820 streams: str | list[str] | None = None, 821 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 822 force_full_refresh: bool = False, 823 skip_validation: bool = False, 824 ) -> ReadResult: 825 """Read from the connector and write to the cache. 826 827 Args: 828 cache: The cache to write to. If not set, a default cache will be used. 829 streams: Optional if already set. A list of stream names to select for reading. If set 830 to "*", all streams will be selected. 831 write_strategy: The strategy to use when writing to the cache. If a string, it must be 832 one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one 833 of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or 834 WriteStrategy.AUTO. 835 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 836 streams will be read in incremental mode if supported by the connector. This option 837 must be True when using the "replace" strategy. 838 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 839 running the connector. This can be helpful in debugging, when you want to send 840 configurations to the connector that otherwise might be rejected by JSON Schema 841 validation rules. 842 """ 843 cache = cache or get_default_cache() 844 progress_tracker = ProgressTracker( 845 source=self, 846 cache=cache, 847 destination=None, 848 expected_streams=None, # Will be set later 849 ) 850 851 # Set up state provider if not in full refresh mode 852 if force_full_refresh: 853 state_provider: StateProviderBase | None = None 854 else: 855 state_provider = cache.get_state_provider( 856 source_name=self._name, 857 ) 858 state_writer = cache.get_state_writer(source_name=self._name) 859 860 if streams: 861 self.select_streams(streams) 862 863 if not self._selected_stream_names: 864 raise exc.PyAirbyteNoStreamsSelectedError( 865 connector_name=self.name, 866 available_streams=self.get_available_streams(), 867 ) 868 869 try: 870 result = self._read_to_cache( 871 cache=cache, 872 catalog_provider=CatalogProvider(self.configured_catalog), 873 stream_names=self._selected_stream_names, 874 state_provider=state_provider, 875 state_writer=state_writer, 876 write_strategy=write_strategy, 877 force_full_refresh=force_full_refresh, 878 skip_validation=skip_validation, 879 progress_tracker=progress_tracker, 880 ) 881 except exc.PyAirbyteInternalError as ex: 882 progress_tracker.log_failure(exception=ex) 883 raise exc.AirbyteConnectorFailedError( 884 connector_name=self.name, 885 log_text=self._last_log_messages, 886 ) from ex 887 except Exception as ex: 888 progress_tracker.log_failure(exception=ex) 889 raise 890 891 progress_tracker.log_success() 892 return result
Read from the connector and write to the cache.
Arguments:
- cache: The cache to write to. If not set, a default cache will be used.
- streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
- write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
- force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
- skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.
Inherited Members
- airbyte._connector_base.ConnectorBase
- config_change_callback
- executor
- name
- get_config
- config_hash
- validate_config
- print_config_spec
- connector_version
- check
- install
- uninstall