airbyte.sources.base
Base class implementation for sources.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Base class implementation for sources.""" 3 4from __future__ import annotations 5 6import sys 7import warnings 8from typing import TYPE_CHECKING, Any, Literal 9 10import yaml 11from rich import print # noqa: A004 # Allow shadowing the built-in 12 13from airbyte_protocol.models import ( 14 AirbyteCatalog, 15 AirbyteMessage, 16 ConfiguredAirbyteCatalog, 17 ConfiguredAirbyteStream, 18 DestinationSyncMode, 19 SyncMode, 20 Type, 21) 22 23from airbyte import exceptions as exc 24from airbyte._connector_base import ConnectorBase 25from airbyte._message_iterators import AirbyteMessageIterator 26from airbyte._util.temp_files import as_temp_files 27from airbyte.caches.util import get_default_cache 28from airbyte.datasets._lazy import LazyDataset 29from airbyte.progress import ProgressStyle, ProgressTracker 30from airbyte.records import StreamRecord, StreamRecordHandler 31from airbyte.results import ReadResult 32from airbyte.shared.catalog_providers import CatalogProvider 33from airbyte.strategies import WriteStrategy 34 35 36if TYPE_CHECKING: 37 from collections.abc import Generator, Iterable, Iterator 38 39 from airbyte_cdk import ConnectorSpecification 40 from airbyte_protocol.models import AirbyteStream 41 42 from airbyte._executors.base import Executor 43 from airbyte.caches import CacheBase 44 from airbyte.callbacks import ConfigChangeCallback 45 from airbyte.documents import Document 46 from airbyte.shared.state_providers import StateProviderBase 47 from airbyte.shared.state_writers import StateWriterBase 48 49 50class Source(ConnectorBase): 51 """A class representing a source that can be called.""" 52 53 connector_type = "source" 54 55 def __init__( 56 self, 57 executor: Executor, 58 name: str, 59 config: dict[str, Any] | None = None, 60 *, 61 config_change_callback: ConfigChangeCallback | None = None, 62 streams: str | list[str] | None = None, 63 validate: bool = False, 64 cursor_key_overrides: dict[str, str] | None = None, 65 primary_key_overrides: dict[str, str | list[str]] | None = None, 66 ) -> None: 67 """Initialize the source. 68 69 If config is provided, it will be validated against the spec if validate is True. 70 """ 71 self._to_be_selected_streams: list[str] | str = [] 72 """Used to hold selection criteria before catalog is known.""" 73 74 super().__init__( 75 executor=executor, 76 name=name, 77 config=config, 78 config_change_callback=config_change_callback, 79 validate=validate, 80 ) 81 self._config_dict: dict[str, Any] | None = None 82 self._last_log_messages: list[str] = [] 83 self._discovered_catalog: AirbyteCatalog | None = None 84 self._selected_stream_names: list[str] = [] 85 86 self._cursor_key_overrides: dict[str, str] = {} 87 """A mapping of lower-cased stream names to cursor key overrides.""" 88 89 self._primary_key_overrides: dict[str, list[str]] = {} 90 """A mapping of lower-cased stream names to primary key overrides.""" 91 92 if config is not None: 93 self.set_config(config, validate=validate) 94 if streams is not None: 95 self.select_streams(streams) 96 if cursor_key_overrides is not None: 97 self.set_cursor_keys(**cursor_key_overrides) 98 if primary_key_overrides is not None: 99 self.set_primary_keys(**primary_key_overrides) 100 101 def set_streams(self, streams: list[str]) -> None: 102 """Deprecated. See select_streams().""" 103 warnings.warn( 104 "The 'set_streams' method is deprecated and will be removed in a future version. " 105 "Please use the 'select_streams' method instead.", 106 DeprecationWarning, 107 stacklevel=2, 108 ) 109 self.select_streams(streams) 110 111 def set_cursor_key( 112 self, 113 stream_name: str, 114 cursor_key: str, 115 ) -> None: 116 """Set the cursor for a single stream. 117 118 Note: 119 - This does not unset previously set cursors. 120 - The cursor key must be a single field name. 121 - Not all streams support custom cursors. If a stream does not support custom cursors, 122 the override may be ignored. 123 - Stream names are case insensitive, while field names are case sensitive. 124 - Stream names are not validated by PyAirbyte. If the stream name 125 does not exist in the catalog, the override may be ignored. 126 """ 127 self._cursor_key_overrides[stream_name.lower()] = cursor_key 128 129 def set_cursor_keys( 130 self, 131 **kwargs: str, 132 ) -> None: 133 """Override the cursor key for one or more streams. 134 135 Usage: 136 source.set_cursor_keys( 137 stream1="cursor1", 138 stream2="cursor2", 139 ) 140 141 Note: 142 - This does not unset previously set cursors. 143 - The cursor key must be a single field name. 144 - Not all streams support custom cursors. If a stream does not support custom cursors, 145 the override may be ignored. 146 - Stream names are case insensitive, while field names are case sensitive. 147 - Stream names are not validated by PyAirbyte. If the stream name 148 does not exist in the catalog, the override may be ignored. 149 """ 150 self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()}) 151 152 def set_primary_key( 153 self, 154 stream_name: str, 155 primary_key: str | list[str], 156 ) -> None: 157 """Set the primary key for a single stream. 158 159 Note: 160 - This does not unset previously set primary keys. 161 - The primary key must be a single field name or a list of field names. 162 - Not all streams support overriding primary keys. If a stream does not support overriding 163 primary keys, the override may be ignored. 164 - Stream names are case insensitive, while field names are case sensitive. 165 - Stream names are not validated by PyAirbyte. If the stream name 166 does not exist in the catalog, the override may be ignored. 167 """ 168 self._primary_key_overrides[stream_name.lower()] = ( 169 primary_key if isinstance(primary_key, list) else [primary_key] 170 ) 171 172 def set_primary_keys( 173 self, 174 **kwargs: str | list[str], 175 ) -> None: 176 """Override the primary keys for one or more streams. 177 178 This does not unset previously set primary keys. 179 180 Usage: 181 source.set_primary_keys( 182 stream1="pk1", 183 stream2=["pk1", "pk2"], 184 ) 185 186 Note: 187 - This does not unset previously set primary keys. 188 - The primary key must be a single field name or a list of field names. 189 - Not all streams support overriding primary keys. If a stream does not support overriding 190 primary keys, the override may be ignored. 191 - Stream names are case insensitive, while field names are case sensitive. 192 - Stream names are not validated by PyAirbyte. If the stream name 193 does not exist in the catalog, the override may be ignored. 194 """ 195 self._primary_key_overrides.update( 196 {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()} 197 ) 198 199 def _log_warning_preselected_stream(self, streams: str | list[str]) -> None: 200 """Logs a warning message indicating stream selection which are not selected yet.""" 201 if streams == "*": 202 print( 203 "Warning: Config is not set yet. All streams will be selected after config is set.", 204 file=sys.stderr, 205 ) 206 else: 207 print( 208 "Warning: Config is not set yet. " 209 f"Streams to be selected after config is set: {streams}", 210 file=sys.stderr, 211 ) 212 213 def select_all_streams(self) -> None: 214 """Select all streams. 215 216 This is a more streamlined equivalent to: 217 > source.select_streams(source.get_available_streams()). 218 """ 219 if self._config_dict is None: 220 self._to_be_selected_streams = "*" 221 self._log_warning_preselected_stream(self._to_be_selected_streams) 222 return 223 224 self._selected_stream_names = self.get_available_streams() 225 226 def select_streams(self, streams: str | list[str]) -> None: 227 """Select the stream names that should be read from the connector. 228 229 Args: 230 streams: A list of stream names to select. If set to "*", all streams will be selected. 231 232 Currently, if this is not set, all streams will be read. 233 """ 234 if self._config_dict is None: 235 self._to_be_selected_streams = streams 236 self._log_warning_preselected_stream(streams) 237 return 238 239 if streams == "*": 240 self.select_all_streams() 241 return 242 243 if isinstance(streams, str): 244 # If a single stream is provided, convert it to a one-item list 245 streams = [streams] 246 247 available_streams = self.get_available_streams() 248 for stream in streams: 249 if stream not in available_streams: 250 raise exc.AirbyteStreamNotFoundError( 251 stream_name=stream, 252 connector_name=self.name, 253 available_streams=available_streams, 254 ) 255 self._selected_stream_names = streams 256 257 def get_selected_streams(self) -> list[str]: 258 """Get the selected streams. 259 260 If no streams are selected, return an empty list. 261 """ 262 return self._selected_stream_names 263 264 def set_config( 265 self, 266 config: dict[str, Any], 267 *, 268 validate: bool = True, 269 ) -> None: 270 """Set the config for the connector. 271 272 If validate is True, raise an exception if the config fails validation. 273 274 If validate is False, validation will be deferred until check() or validate_config() 275 is called. 276 """ 277 if validate: 278 self.validate_config(config) 279 280 self._config_dict = config 281 282 if self._to_be_selected_streams: 283 self.select_streams(self._to_be_selected_streams) 284 self._to_be_selected_streams = [] 285 286 def _discover(self) -> AirbyteCatalog: 287 """Call discover on the connector. 288 289 This involves the following steps: 290 - Write the config to a temporary file 291 - execute the connector with discover --config <config_file> 292 - Listen to the messages and return the first AirbyteCatalog that comes along. 293 - Make sure the subprocess is killed when the function returns. 294 """ 295 with as_temp_files([self._hydrated_config]) as [config_file]: 296 for msg in self._execute(["discover", "--config", config_file]): 297 if msg.type == Type.CATALOG and msg.catalog: 298 return msg.catalog 299 raise exc.AirbyteConnectorMissingCatalogError( 300 connector_name=self.name, 301 log_text=self._last_log_messages, 302 ) 303 304 def get_available_streams(self) -> list[str]: 305 """Get the available streams from the spec.""" 306 return [s.name for s in self.discovered_catalog.streams] 307 308 def _get_incremental_stream_names(self) -> list[str]: 309 """Get the name of streams that support incremental sync.""" 310 return [ 311 stream.name 312 for stream in self.discovered_catalog.streams 313 if SyncMode.incremental in stream.supported_sync_modes 314 ] 315 316 def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: 317 """Call spec on the connector. 318 319 This involves the following steps: 320 * execute the connector with spec 321 * Listen to the messages and return the first AirbyteCatalog that comes along. 322 * Make sure the subprocess is killed when the function returns. 323 """ 324 if force_refresh or self._spec is None: 325 for msg in self._execute(["spec"]): 326 if msg.type == Type.SPEC and msg.spec: 327 self._spec = msg.spec 328 break 329 330 if self._spec: 331 return self._spec 332 333 raise exc.AirbyteConnectorMissingSpecError( 334 connector_name=self.name, 335 log_text=self._last_log_messages, 336 ) 337 338 @property 339 def config_spec(self) -> dict[str, Any]: 340 """Generate a configuration spec for this connector, as a JSON Schema definition. 341 342 This function generates a JSON Schema dictionary with configuration specs for the 343 current connector, as a dictionary. 344 345 Returns: 346 dict: The JSON Schema configuration spec as a dictionary. 347 """ 348 return self._get_spec(force_refresh=True).connectionSpecification 349 350 @property 351 def _yaml_spec(self) -> str: 352 """Get the spec as a yaml string. 353 354 For now, the primary use case is for writing and debugging a valid config for a source. 355 356 This is private for now because we probably want better polish before exposing this 357 as a stable interface. This will also get easier when we have docs links with this info 358 for each connector. 359 """ 360 spec_obj: ConnectorSpecification = self._get_spec() 361 spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True) 362 # convert to a yaml string 363 return yaml.dump(spec_dict) 364 365 @property 366 def docs_url(self) -> str: 367 """Get the URL to the connector's documentation.""" 368 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 369 "source-", "" 370 ) 371 372 @property 373 def discovered_catalog(self) -> AirbyteCatalog: 374 """Get the raw catalog for the given streams. 375 376 If the catalog is not yet known, we call discover to get it. 377 """ 378 if self._discovered_catalog is None: 379 self._discovered_catalog = self._discover() 380 381 return self._discovered_catalog 382 383 @property 384 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 385 """Get the configured catalog for the given streams. 386 387 If the raw catalog is not yet known, we call discover to get it. 388 389 If no specific streams are selected, we return a catalog that syncs all available streams. 390 391 TODO: We should consider disabling by default the streams that the connector would 392 disable by default. (For instance, streams that require a premium license are sometimes 393 disabled by default within the connector.) 394 """ 395 # Ensure discovered catalog is cached before we start 396 _ = self.discovered_catalog 397 398 # Filter for selected streams if set, otherwise use all available streams: 399 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 400 return self.get_configured_catalog(streams=streams_filter) 401 402 def get_configured_catalog( 403 self, 404 streams: Literal["*"] | list[str] | None = None, 405 ) -> ConfiguredAirbyteCatalog: 406 """Get a configured catalog for the given streams. 407 408 If no streams are provided, the selected streams will be used. If no streams are selected, 409 all available streams will be used. 410 411 If '*' is provided, all available streams will be used. 412 """ 413 selected_streams: list[str] = [] 414 if streams is None: 415 selected_streams = self._selected_stream_names or self.get_available_streams() 416 elif streams == "*": 417 selected_streams = self.get_available_streams() 418 elif isinstance(streams, list): 419 selected_streams = streams 420 else: 421 raise exc.PyAirbyteInputError( 422 message="Invalid streams argument.", 423 input_value=streams, 424 ) 425 426 return ConfiguredAirbyteCatalog( 427 streams=[ 428 ConfiguredAirbyteStream( 429 stream=stream, 430 destination_sync_mode=DestinationSyncMode.overwrite, 431 sync_mode=SyncMode.incremental, 432 primary_key=( 433 [self._primary_key_overrides[stream.name.lower()]] 434 if stream.name.lower() in self._primary_key_overrides 435 else stream.source_defined_primary_key 436 ), 437 cursor_field=( 438 [self._cursor_key_overrides[stream.name.lower()]] 439 if stream.name.lower() in self._cursor_key_overrides 440 else stream.default_cursor_field 441 ), 442 # These are unused in the current implementation: 443 generation_id=None, 444 minimum_generation_id=None, 445 sync_id=None, 446 ) 447 for stream in self.discovered_catalog.streams 448 if stream.name in selected_streams 449 ], 450 ) 451 452 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 453 """Return the JSON Schema spec for the specified stream name.""" 454 catalog: AirbyteCatalog = self.discovered_catalog 455 found: list[AirbyteStream] = [ 456 stream for stream in catalog.streams if stream.name == stream_name 457 ] 458 459 if len(found) == 0: 460 raise exc.PyAirbyteInputError( 461 message="Stream name does not exist in catalog.", 462 input_value=stream_name, 463 ) 464 465 if len(found) > 1: 466 raise exc.PyAirbyteInternalError( 467 message="Duplicate streams found with the same name.", 468 context={ 469 "found_streams": found, 470 }, 471 ) 472 473 return found[0].json_schema 474 475 def get_records( 476 self, 477 stream: str, 478 *, 479 normalize_field_names: bool = False, 480 prune_undeclared_fields: bool = True, 481 ) -> LazyDataset: 482 """Read a stream from the connector. 483 484 Args: 485 stream: The name of the stream to read. 486 normalize_field_names: When `True`, field names will be normalized to lower case, with 487 special characters removed. This matches the behavior of PyAirbyte caches and most 488 Airbyte destinations. 489 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 490 which generally matches the behavior of PyAirbyte caches and most Airbyte 491 destinations, specifically when you expect the catalog may be stale. You can disable 492 this to keep all fields in the records. 493 494 This involves the following steps: 495 * Call discover to get the catalog 496 * Generate a configured catalog that syncs the given stream in full_refresh mode 497 * Write the configured catalog and the config to a temporary file 498 * execute the connector with read --config <config_file> --catalog <catalog_file> 499 * Listen to the messages and return the first AirbyteRecordMessages that come along. 500 * Make sure the subprocess is killed when the function returns. 501 """ 502 configured_catalog = self.get_configured_catalog(streams=[stream]) 503 if len(configured_catalog.streams) == 0: 504 raise exc.PyAirbyteInputError( 505 message="Requested stream does not exist.", 506 context={ 507 "stream": stream, 508 "available_streams": self.get_available_streams(), 509 "connector_name": self.name, 510 }, 511 ) from KeyError(stream) 512 513 configured_stream = configured_catalog.streams[0] 514 515 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 516 yield from records 517 518 stream_record_handler = StreamRecordHandler( 519 json_schema=self.get_stream_json_schema(stream), 520 prune_extra_fields=prune_undeclared_fields, 521 normalize_keys=normalize_field_names, 522 ) 523 524 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 525 progress_tracker = ProgressTracker( 526 ProgressStyle.PLAIN, 527 source=self, 528 cache=None, 529 destination=None, 530 expected_streams=[stream], 531 ) 532 533 iterator: Iterator[dict[str, Any]] = ( 534 StreamRecord.from_record_message( 535 record_message=record.record, 536 stream_record_handler=stream_record_handler, 537 ) 538 for record in self._read_with_catalog( 539 catalog=configured_catalog, 540 progress_tracker=progress_tracker, 541 ) 542 if record.record 543 ) 544 progress_tracker.log_success() 545 return LazyDataset( 546 iterator, 547 stream_metadata=configured_stream, 548 ) 549 550 def get_documents( 551 self, 552 stream: str, 553 title_property: str | None = None, 554 content_properties: list[str] | None = None, 555 metadata_properties: list[str] | None = None, 556 *, 557 render_metadata: bool = False, 558 ) -> Iterable[Document]: 559 """Read a stream from the connector and return the records as documents. 560 561 If metadata_properties is not set, all properties that are not content will be added to 562 the metadata. 563 564 If render_metadata is True, metadata will be rendered in the document, as well as the 565 the main content. 566 """ 567 return self.get_records(stream).to_documents( 568 title_property=title_property, 569 content_properties=content_properties, 570 metadata_properties=metadata_properties, 571 render_metadata=render_metadata, 572 ) 573 574 def _get_airbyte_message_iterator( 575 self, 576 *, 577 streams: Literal["*"] | list[str] | None = None, 578 state_provider: StateProviderBase | None = None, 579 progress_tracker: ProgressTracker, 580 force_full_refresh: bool = False, 581 ) -> AirbyteMessageIterator: 582 """Get an AirbyteMessageIterator for this source.""" 583 return AirbyteMessageIterator( 584 self._read_with_catalog( 585 catalog=self.get_configured_catalog(streams=streams), 586 state=state_provider if not force_full_refresh else None, 587 progress_tracker=progress_tracker, 588 ) 589 ) 590 591 def _read_with_catalog( 592 self, 593 catalog: ConfiguredAirbyteCatalog, 594 progress_tracker: ProgressTracker, 595 state: StateProviderBase | None = None, 596 ) -> Generator[AirbyteMessage, None, None]: 597 """Call read on the connector. 598 599 This involves the following steps: 600 * Write the config to a temporary file 601 * execute the connector with read --config <config_file> --catalog <catalog_file> 602 * Listen to the messages and return the AirbyteRecordMessages that come along. 603 * Send out telemetry on the performed sync (with information about which source was used and 604 the type of the cache) 605 """ 606 with as_temp_files( 607 [ 608 self._hydrated_config, 609 catalog.model_dump_json(), 610 state.to_state_input_file_text() if state else "[]", 611 ] 612 ) as [ 613 config_file, 614 catalog_file, 615 state_file, 616 ]: 617 message_generator = self._execute( 618 [ 619 "read", 620 "--config", 621 config_file, 622 "--catalog", 623 catalog_file, 624 "--state", 625 state_file, 626 ], 627 progress_tracker=progress_tracker, 628 ) 629 yield from progress_tracker.tally_records_read(message_generator) 630 progress_tracker.log_read_complete() 631 632 def _peek_airbyte_message( 633 self, 634 message: AirbyteMessage, 635 *, 636 raise_on_error: bool = True, 637 ) -> None: 638 """Process an Airbyte message. 639 640 This method handles reading Airbyte messages and taking action, if needed, based on the 641 message type. For instance, log messages are logged, records are tallied, and errors are 642 raised as exceptions if `raise_on_error` is True. 643 644 Raises: 645 AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted. 646 """ 647 super()._peek_airbyte_message(message, raise_on_error=raise_on_error) 648 649 def _log_incremental_streams( 650 self, 651 *, 652 incremental_streams: set[str] | None = None, 653 ) -> None: 654 """Log the streams which are using incremental sync mode.""" 655 log_message = ( 656 "The following streams are currently using incremental sync:\n" 657 f"{incremental_streams}\n" 658 "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method." 659 ) 660 print(log_message, file=sys.stderr) 661 662 def read( 663 self, 664 cache: CacheBase | None = None, 665 *, 666 streams: str | list[str] | None = None, 667 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 668 force_full_refresh: bool = False, 669 skip_validation: bool = False, 670 ) -> ReadResult: 671 """Read from the connector and write to the cache. 672 673 Args: 674 cache: The cache to write to. If not set, a default cache will be used. 675 streams: Optional if already set. A list of stream names to select for reading. If set 676 to "*", all streams will be selected. 677 write_strategy: The strategy to use when writing to the cache. If a string, it must be 678 one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one 679 of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or 680 WriteStrategy.AUTO. 681 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 682 streams will be read in incremental mode if supported by the connector. This option 683 must be True when using the "replace" strategy. 684 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 685 running the connector. This can be helpful in debugging, when you want to send 686 configurations to the connector that otherwise might be rejected by JSON Schema 687 validation rules. 688 """ 689 cache = cache or get_default_cache() 690 progress_tracker = ProgressTracker( 691 source=self, 692 cache=cache, 693 destination=None, 694 expected_streams=None, # Will be set later 695 ) 696 697 # Set up state provider if not in full refresh mode 698 if force_full_refresh: 699 state_provider: StateProviderBase | None = None 700 else: 701 state_provider = cache.get_state_provider( 702 source_name=self._name, 703 ) 704 state_writer = cache.get_state_writer(source_name=self._name) 705 706 if streams: 707 self.select_streams(streams) 708 709 if not self._selected_stream_names: 710 raise exc.PyAirbyteNoStreamsSelectedError( 711 connector_name=self.name, 712 available_streams=self.get_available_streams(), 713 ) 714 715 try: 716 result = self._read_to_cache( 717 cache=cache, 718 catalog_provider=CatalogProvider(self.configured_catalog), 719 stream_names=self._selected_stream_names, 720 state_provider=state_provider, 721 state_writer=state_writer, 722 write_strategy=write_strategy, 723 force_full_refresh=force_full_refresh, 724 skip_validation=skip_validation, 725 progress_tracker=progress_tracker, 726 ) 727 except exc.PyAirbyteInternalError as ex: 728 progress_tracker.log_failure(exception=ex) 729 raise exc.AirbyteConnectorFailedError( 730 connector_name=self.name, 731 log_text=self._last_log_messages, 732 ) from ex 733 except Exception as ex: 734 progress_tracker.log_failure(exception=ex) 735 raise 736 737 progress_tracker.log_success() 738 return result 739 740 def _read_to_cache( # noqa: PLR0913 # Too many arguments 741 self, 742 cache: CacheBase, 743 *, 744 catalog_provider: CatalogProvider, 745 stream_names: list[str], 746 state_provider: StateProviderBase | None, 747 state_writer: StateWriterBase | None, 748 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 749 force_full_refresh: bool = False, 750 skip_validation: bool = False, 751 progress_tracker: ProgressTracker, 752 ) -> ReadResult: 753 """Internal read method.""" 754 if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: 755 warnings.warn( 756 message=( 757 "Using `REPLACE` strategy without also setting `full_refresh_mode=True` " 758 "could result in data loss. " 759 "To silence this warning, use the following: " 760 'warnings.filterwarnings("ignore", ' 761 'category="airbyte.warnings.PyAirbyteDataLossWarning")`' 762 ), 763 category=exc.PyAirbyteDataLossWarning, 764 stacklevel=1, 765 ) 766 if isinstance(write_strategy, str): 767 try: 768 write_strategy = WriteStrategy(write_strategy) 769 except ValueError: 770 raise exc.PyAirbyteInputError( 771 message="Invalid strategy", 772 context={ 773 "write_strategy": write_strategy, 774 "available_strategies": [s.value for s in WriteStrategy], 775 }, 776 ) from None 777 778 # Run optional validation step 779 if not skip_validation: 780 self.validate_config() 781 782 # Log incremental stream if incremental streams are known 783 if state_provider and state_provider.known_stream_names: 784 # Retrieve set of the known streams support which support incremental sync 785 incremental_streams = ( 786 set(self._get_incremental_stream_names()) 787 & state_provider.known_stream_names 788 & set(self.get_selected_streams()) 789 ) 790 if incremental_streams: 791 self._log_incremental_streams(incremental_streams=incremental_streams) 792 793 airbyte_message_iterator = AirbyteMessageIterator( 794 self._read_with_catalog( 795 catalog=catalog_provider.configured_catalog, 796 state=state_provider, 797 progress_tracker=progress_tracker, 798 ) 799 ) 800 cache._write_airbyte_message_stream( # noqa: SLF001 # Non-public API 801 stdin=airbyte_message_iterator, 802 catalog_provider=catalog_provider, 803 write_strategy=write_strategy, 804 state_writer=state_writer, 805 progress_tracker=progress_tracker, 806 ) 807 808 # Flush the WAL, if applicable 809 cache.processor._do_checkpoint() # noqa: SLF001 # Non-public API 810 811 return ReadResult( 812 source_name=self.name, 813 progress_tracker=progress_tracker, 814 processed_streams=stream_names, 815 cache=cache, 816 ) 817 818 819__all__ = [ 820 "Source", 821]
51class Source(ConnectorBase): 52 """A class representing a source that can be called.""" 53 54 connector_type = "source" 55 56 def __init__( 57 self, 58 executor: Executor, 59 name: str, 60 config: dict[str, Any] | None = None, 61 *, 62 config_change_callback: ConfigChangeCallback | None = None, 63 streams: str | list[str] | None = None, 64 validate: bool = False, 65 cursor_key_overrides: dict[str, str] | None = None, 66 primary_key_overrides: dict[str, str | list[str]] | None = None, 67 ) -> None: 68 """Initialize the source. 69 70 If config is provided, it will be validated against the spec if validate is True. 71 """ 72 self._to_be_selected_streams: list[str] | str = [] 73 """Used to hold selection criteria before catalog is known.""" 74 75 super().__init__( 76 executor=executor, 77 name=name, 78 config=config, 79 config_change_callback=config_change_callback, 80 validate=validate, 81 ) 82 self._config_dict: dict[str, Any] | None = None 83 self._last_log_messages: list[str] = [] 84 self._discovered_catalog: AirbyteCatalog | None = None 85 self._selected_stream_names: list[str] = [] 86 87 self._cursor_key_overrides: dict[str, str] = {} 88 """A mapping of lower-cased stream names to cursor key overrides.""" 89 90 self._primary_key_overrides: dict[str, list[str]] = {} 91 """A mapping of lower-cased stream names to primary key overrides.""" 92 93 if config is not None: 94 self.set_config(config, validate=validate) 95 if streams is not None: 96 self.select_streams(streams) 97 if cursor_key_overrides is not None: 98 self.set_cursor_keys(**cursor_key_overrides) 99 if primary_key_overrides is not None: 100 self.set_primary_keys(**primary_key_overrides) 101 102 def set_streams(self, streams: list[str]) -> None: 103 """Deprecated. See select_streams().""" 104 warnings.warn( 105 "The 'set_streams' method is deprecated and will be removed in a future version. " 106 "Please use the 'select_streams' method instead.", 107 DeprecationWarning, 108 stacklevel=2, 109 ) 110 self.select_streams(streams) 111 112 def set_cursor_key( 113 self, 114 stream_name: str, 115 cursor_key: str, 116 ) -> None: 117 """Set the cursor for a single stream. 118 119 Note: 120 - This does not unset previously set cursors. 121 - The cursor key must be a single field name. 122 - Not all streams support custom cursors. If a stream does not support custom cursors, 123 the override may be ignored. 124 - Stream names are case insensitive, while field names are case sensitive. 125 - Stream names are not validated by PyAirbyte. If the stream name 126 does not exist in the catalog, the override may be ignored. 127 """ 128 self._cursor_key_overrides[stream_name.lower()] = cursor_key 129 130 def set_cursor_keys( 131 self, 132 **kwargs: str, 133 ) -> None: 134 """Override the cursor key for one or more streams. 135 136 Usage: 137 source.set_cursor_keys( 138 stream1="cursor1", 139 stream2="cursor2", 140 ) 141 142 Note: 143 - This does not unset previously set cursors. 144 - The cursor key must be a single field name. 145 - Not all streams support custom cursors. If a stream does not support custom cursors, 146 the override may be ignored. 147 - Stream names are case insensitive, while field names are case sensitive. 148 - Stream names are not validated by PyAirbyte. If the stream name 149 does not exist in the catalog, the override may be ignored. 150 """ 151 self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()}) 152 153 def set_primary_key( 154 self, 155 stream_name: str, 156 primary_key: str | list[str], 157 ) -> None: 158 """Set the primary key for a single stream. 159 160 Note: 161 - This does not unset previously set primary keys. 162 - The primary key must be a single field name or a list of field names. 163 - Not all streams support overriding primary keys. If a stream does not support overriding 164 primary keys, the override may be ignored. 165 - Stream names are case insensitive, while field names are case sensitive. 166 - Stream names are not validated by PyAirbyte. If the stream name 167 does not exist in the catalog, the override may be ignored. 168 """ 169 self._primary_key_overrides[stream_name.lower()] = ( 170 primary_key if isinstance(primary_key, list) else [primary_key] 171 ) 172 173 def set_primary_keys( 174 self, 175 **kwargs: str | list[str], 176 ) -> None: 177 """Override the primary keys for one or more streams. 178 179 This does not unset previously set primary keys. 180 181 Usage: 182 source.set_primary_keys( 183 stream1="pk1", 184 stream2=["pk1", "pk2"], 185 ) 186 187 Note: 188 - This does not unset previously set primary keys. 189 - The primary key must be a single field name or a list of field names. 190 - Not all streams support overriding primary keys. If a stream does not support overriding 191 primary keys, the override may be ignored. 192 - Stream names are case insensitive, while field names are case sensitive. 193 - Stream names are not validated by PyAirbyte. If the stream name 194 does not exist in the catalog, the override may be ignored. 195 """ 196 self._primary_key_overrides.update( 197 {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()} 198 ) 199 200 def _log_warning_preselected_stream(self, streams: str | list[str]) -> None: 201 """Logs a warning message indicating stream selection which are not selected yet.""" 202 if streams == "*": 203 print( 204 "Warning: Config is not set yet. All streams will be selected after config is set.", 205 file=sys.stderr, 206 ) 207 else: 208 print( 209 "Warning: Config is not set yet. " 210 f"Streams to be selected after config is set: {streams}", 211 file=sys.stderr, 212 ) 213 214 def select_all_streams(self) -> None: 215 """Select all streams. 216 217 This is a more streamlined equivalent to: 218 > source.select_streams(source.get_available_streams()). 219 """ 220 if self._config_dict is None: 221 self._to_be_selected_streams = "*" 222 self._log_warning_preselected_stream(self._to_be_selected_streams) 223 return 224 225 self._selected_stream_names = self.get_available_streams() 226 227 def select_streams(self, streams: str | list[str]) -> None: 228 """Select the stream names that should be read from the connector. 229 230 Args: 231 streams: A list of stream names to select. If set to "*", all streams will be selected. 232 233 Currently, if this is not set, all streams will be read. 234 """ 235 if self._config_dict is None: 236 self._to_be_selected_streams = streams 237 self._log_warning_preselected_stream(streams) 238 return 239 240 if streams == "*": 241 self.select_all_streams() 242 return 243 244 if isinstance(streams, str): 245 # If a single stream is provided, convert it to a one-item list 246 streams = [streams] 247 248 available_streams = self.get_available_streams() 249 for stream in streams: 250 if stream not in available_streams: 251 raise exc.AirbyteStreamNotFoundError( 252 stream_name=stream, 253 connector_name=self.name, 254 available_streams=available_streams, 255 ) 256 self._selected_stream_names = streams 257 258 def get_selected_streams(self) -> list[str]: 259 """Get the selected streams. 260 261 If no streams are selected, return an empty list. 262 """ 263 return self._selected_stream_names 264 265 def set_config( 266 self, 267 config: dict[str, Any], 268 *, 269 validate: bool = True, 270 ) -> None: 271 """Set the config for the connector. 272 273 If validate is True, raise an exception if the config fails validation. 274 275 If validate is False, validation will be deferred until check() or validate_config() 276 is called. 277 """ 278 if validate: 279 self.validate_config(config) 280 281 self._config_dict = config 282 283 if self._to_be_selected_streams: 284 self.select_streams(self._to_be_selected_streams) 285 self._to_be_selected_streams = [] 286 287 def _discover(self) -> AirbyteCatalog: 288 """Call discover on the connector. 289 290 This involves the following steps: 291 - Write the config to a temporary file 292 - execute the connector with discover --config <config_file> 293 - Listen to the messages and return the first AirbyteCatalog that comes along. 294 - Make sure the subprocess is killed when the function returns. 295 """ 296 with as_temp_files([self._hydrated_config]) as [config_file]: 297 for msg in self._execute(["discover", "--config", config_file]): 298 if msg.type == Type.CATALOG and msg.catalog: 299 return msg.catalog 300 raise exc.AirbyteConnectorMissingCatalogError( 301 connector_name=self.name, 302 log_text=self._last_log_messages, 303 ) 304 305 def get_available_streams(self) -> list[str]: 306 """Get the available streams from the spec.""" 307 return [s.name for s in self.discovered_catalog.streams] 308 309 def _get_incremental_stream_names(self) -> list[str]: 310 """Get the name of streams that support incremental sync.""" 311 return [ 312 stream.name 313 for stream in self.discovered_catalog.streams 314 if SyncMode.incremental in stream.supported_sync_modes 315 ] 316 317 def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: 318 """Call spec on the connector. 319 320 This involves the following steps: 321 * execute the connector with spec 322 * Listen to the messages and return the first AirbyteCatalog that comes along. 323 * Make sure the subprocess is killed when the function returns. 324 """ 325 if force_refresh or self._spec is None: 326 for msg in self._execute(["spec"]): 327 if msg.type == Type.SPEC and msg.spec: 328 self._spec = msg.spec 329 break 330 331 if self._spec: 332 return self._spec 333 334 raise exc.AirbyteConnectorMissingSpecError( 335 connector_name=self.name, 336 log_text=self._last_log_messages, 337 ) 338 339 @property 340 def config_spec(self) -> dict[str, Any]: 341 """Generate a configuration spec for this connector, as a JSON Schema definition. 342 343 This function generates a JSON Schema dictionary with configuration specs for the 344 current connector, as a dictionary. 345 346 Returns: 347 dict: The JSON Schema configuration spec as a dictionary. 348 """ 349 return self._get_spec(force_refresh=True).connectionSpecification 350 351 @property 352 def _yaml_spec(self) -> str: 353 """Get the spec as a yaml string. 354 355 For now, the primary use case is for writing and debugging a valid config for a source. 356 357 This is private for now because we probably want better polish before exposing this 358 as a stable interface. This will also get easier when we have docs links with this info 359 for each connector. 360 """ 361 spec_obj: ConnectorSpecification = self._get_spec() 362 spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True) 363 # convert to a yaml string 364 return yaml.dump(spec_dict) 365 366 @property 367 def docs_url(self) -> str: 368 """Get the URL to the connector's documentation.""" 369 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 370 "source-", "" 371 ) 372 373 @property 374 def discovered_catalog(self) -> AirbyteCatalog: 375 """Get the raw catalog for the given streams. 376 377 If the catalog is not yet known, we call discover to get it. 378 """ 379 if self._discovered_catalog is None: 380 self._discovered_catalog = self._discover() 381 382 return self._discovered_catalog 383 384 @property 385 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 386 """Get the configured catalog for the given streams. 387 388 If the raw catalog is not yet known, we call discover to get it. 389 390 If no specific streams are selected, we return a catalog that syncs all available streams. 391 392 TODO: We should consider disabling by default the streams that the connector would 393 disable by default. (For instance, streams that require a premium license are sometimes 394 disabled by default within the connector.) 395 """ 396 # Ensure discovered catalog is cached before we start 397 _ = self.discovered_catalog 398 399 # Filter for selected streams if set, otherwise use all available streams: 400 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 401 return self.get_configured_catalog(streams=streams_filter) 402 403 def get_configured_catalog( 404 self, 405 streams: Literal["*"] | list[str] | None = None, 406 ) -> ConfiguredAirbyteCatalog: 407 """Get a configured catalog for the given streams. 408 409 If no streams are provided, the selected streams will be used. If no streams are selected, 410 all available streams will be used. 411 412 If '*' is provided, all available streams will be used. 413 """ 414 selected_streams: list[str] = [] 415 if streams is None: 416 selected_streams = self._selected_stream_names or self.get_available_streams() 417 elif streams == "*": 418 selected_streams = self.get_available_streams() 419 elif isinstance(streams, list): 420 selected_streams = streams 421 else: 422 raise exc.PyAirbyteInputError( 423 message="Invalid streams argument.", 424 input_value=streams, 425 ) 426 427 return ConfiguredAirbyteCatalog( 428 streams=[ 429 ConfiguredAirbyteStream( 430 stream=stream, 431 destination_sync_mode=DestinationSyncMode.overwrite, 432 sync_mode=SyncMode.incremental, 433 primary_key=( 434 [self._primary_key_overrides[stream.name.lower()]] 435 if stream.name.lower() in self._primary_key_overrides 436 else stream.source_defined_primary_key 437 ), 438 cursor_field=( 439 [self._cursor_key_overrides[stream.name.lower()]] 440 if stream.name.lower() in self._cursor_key_overrides 441 else stream.default_cursor_field 442 ), 443 # These are unused in the current implementation: 444 generation_id=None, 445 minimum_generation_id=None, 446 sync_id=None, 447 ) 448 for stream in self.discovered_catalog.streams 449 if stream.name in selected_streams 450 ], 451 ) 452 453 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 454 """Return the JSON Schema spec for the specified stream name.""" 455 catalog: AirbyteCatalog = self.discovered_catalog 456 found: list[AirbyteStream] = [ 457 stream for stream in catalog.streams if stream.name == stream_name 458 ] 459 460 if len(found) == 0: 461 raise exc.PyAirbyteInputError( 462 message="Stream name does not exist in catalog.", 463 input_value=stream_name, 464 ) 465 466 if len(found) > 1: 467 raise exc.PyAirbyteInternalError( 468 message="Duplicate streams found with the same name.", 469 context={ 470 "found_streams": found, 471 }, 472 ) 473 474 return found[0].json_schema 475 476 def get_records( 477 self, 478 stream: str, 479 *, 480 normalize_field_names: bool = False, 481 prune_undeclared_fields: bool = True, 482 ) -> LazyDataset: 483 """Read a stream from the connector. 484 485 Args: 486 stream: The name of the stream to read. 487 normalize_field_names: When `True`, field names will be normalized to lower case, with 488 special characters removed. This matches the behavior of PyAirbyte caches and most 489 Airbyte destinations. 490 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 491 which generally matches the behavior of PyAirbyte caches and most Airbyte 492 destinations, specifically when you expect the catalog may be stale. You can disable 493 this to keep all fields in the records. 494 495 This involves the following steps: 496 * Call discover to get the catalog 497 * Generate a configured catalog that syncs the given stream in full_refresh mode 498 * Write the configured catalog and the config to a temporary file 499 * execute the connector with read --config <config_file> --catalog <catalog_file> 500 * Listen to the messages and return the first AirbyteRecordMessages that come along. 501 * Make sure the subprocess is killed when the function returns. 502 """ 503 configured_catalog = self.get_configured_catalog(streams=[stream]) 504 if len(configured_catalog.streams) == 0: 505 raise exc.PyAirbyteInputError( 506 message="Requested stream does not exist.", 507 context={ 508 "stream": stream, 509 "available_streams": self.get_available_streams(), 510 "connector_name": self.name, 511 }, 512 ) from KeyError(stream) 513 514 configured_stream = configured_catalog.streams[0] 515 516 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 517 yield from records 518 519 stream_record_handler = StreamRecordHandler( 520 json_schema=self.get_stream_json_schema(stream), 521 prune_extra_fields=prune_undeclared_fields, 522 normalize_keys=normalize_field_names, 523 ) 524 525 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 526 progress_tracker = ProgressTracker( 527 ProgressStyle.PLAIN, 528 source=self, 529 cache=None, 530 destination=None, 531 expected_streams=[stream], 532 ) 533 534 iterator: Iterator[dict[str, Any]] = ( 535 StreamRecord.from_record_message( 536 record_message=record.record, 537 stream_record_handler=stream_record_handler, 538 ) 539 for record in self._read_with_catalog( 540 catalog=configured_catalog, 541 progress_tracker=progress_tracker, 542 ) 543 if record.record 544 ) 545 progress_tracker.log_success() 546 return LazyDataset( 547 iterator, 548 stream_metadata=configured_stream, 549 ) 550 551 def get_documents( 552 self, 553 stream: str, 554 title_property: str | None = None, 555 content_properties: list[str] | None = None, 556 metadata_properties: list[str] | None = None, 557 *, 558 render_metadata: bool = False, 559 ) -> Iterable[Document]: 560 """Read a stream from the connector and return the records as documents. 561 562 If metadata_properties is not set, all properties that are not content will be added to 563 the metadata. 564 565 If render_metadata is True, metadata will be rendered in the document, as well as the 566 the main content. 567 """ 568 return self.get_records(stream).to_documents( 569 title_property=title_property, 570 content_properties=content_properties, 571 metadata_properties=metadata_properties, 572 render_metadata=render_metadata, 573 ) 574 575 def _get_airbyte_message_iterator( 576 self, 577 *, 578 streams: Literal["*"] | list[str] | None = None, 579 state_provider: StateProviderBase | None = None, 580 progress_tracker: ProgressTracker, 581 force_full_refresh: bool = False, 582 ) -> AirbyteMessageIterator: 583 """Get an AirbyteMessageIterator for this source.""" 584 return AirbyteMessageIterator( 585 self._read_with_catalog( 586 catalog=self.get_configured_catalog(streams=streams), 587 state=state_provider if not force_full_refresh else None, 588 progress_tracker=progress_tracker, 589 ) 590 ) 591 592 def _read_with_catalog( 593 self, 594 catalog: ConfiguredAirbyteCatalog, 595 progress_tracker: ProgressTracker, 596 state: StateProviderBase | None = None, 597 ) -> Generator[AirbyteMessage, None, None]: 598 """Call read on the connector. 599 600 This involves the following steps: 601 * Write the config to a temporary file 602 * execute the connector with read --config <config_file> --catalog <catalog_file> 603 * Listen to the messages and return the AirbyteRecordMessages that come along. 604 * Send out telemetry on the performed sync (with information about which source was used and 605 the type of the cache) 606 """ 607 with as_temp_files( 608 [ 609 self._hydrated_config, 610 catalog.model_dump_json(), 611 state.to_state_input_file_text() if state else "[]", 612 ] 613 ) as [ 614 config_file, 615 catalog_file, 616 state_file, 617 ]: 618 message_generator = self._execute( 619 [ 620 "read", 621 "--config", 622 config_file, 623 "--catalog", 624 catalog_file, 625 "--state", 626 state_file, 627 ], 628 progress_tracker=progress_tracker, 629 ) 630 yield from progress_tracker.tally_records_read(message_generator) 631 progress_tracker.log_read_complete() 632 633 def _peek_airbyte_message( 634 self, 635 message: AirbyteMessage, 636 *, 637 raise_on_error: bool = True, 638 ) -> None: 639 """Process an Airbyte message. 640 641 This method handles reading Airbyte messages and taking action, if needed, based on the 642 message type. For instance, log messages are logged, records are tallied, and errors are 643 raised as exceptions if `raise_on_error` is True. 644 645 Raises: 646 AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted. 647 """ 648 super()._peek_airbyte_message(message, raise_on_error=raise_on_error) 649 650 def _log_incremental_streams( 651 self, 652 *, 653 incremental_streams: set[str] | None = None, 654 ) -> None: 655 """Log the streams which are using incremental sync mode.""" 656 log_message = ( 657 "The following streams are currently using incremental sync:\n" 658 f"{incremental_streams}\n" 659 "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method." 660 ) 661 print(log_message, file=sys.stderr) 662 663 def read( 664 self, 665 cache: CacheBase | None = None, 666 *, 667 streams: str | list[str] | None = None, 668 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 669 force_full_refresh: bool = False, 670 skip_validation: bool = False, 671 ) -> ReadResult: 672 """Read from the connector and write to the cache. 673 674 Args: 675 cache: The cache to write to. If not set, a default cache will be used. 676 streams: Optional if already set. A list of stream names to select for reading. If set 677 to "*", all streams will be selected. 678 write_strategy: The strategy to use when writing to the cache. If a string, it must be 679 one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one 680 of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or 681 WriteStrategy.AUTO. 682 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 683 streams will be read in incremental mode if supported by the connector. This option 684 must be True when using the "replace" strategy. 685 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 686 running the connector. This can be helpful in debugging, when you want to send 687 configurations to the connector that otherwise might be rejected by JSON Schema 688 validation rules. 689 """ 690 cache = cache or get_default_cache() 691 progress_tracker = ProgressTracker( 692 source=self, 693 cache=cache, 694 destination=None, 695 expected_streams=None, # Will be set later 696 ) 697 698 # Set up state provider if not in full refresh mode 699 if force_full_refresh: 700 state_provider: StateProviderBase | None = None 701 else: 702 state_provider = cache.get_state_provider( 703 source_name=self._name, 704 ) 705 state_writer = cache.get_state_writer(source_name=self._name) 706 707 if streams: 708 self.select_streams(streams) 709 710 if not self._selected_stream_names: 711 raise exc.PyAirbyteNoStreamsSelectedError( 712 connector_name=self.name, 713 available_streams=self.get_available_streams(), 714 ) 715 716 try: 717 result = self._read_to_cache( 718 cache=cache, 719 catalog_provider=CatalogProvider(self.configured_catalog), 720 stream_names=self._selected_stream_names, 721 state_provider=state_provider, 722 state_writer=state_writer, 723 write_strategy=write_strategy, 724 force_full_refresh=force_full_refresh, 725 skip_validation=skip_validation, 726 progress_tracker=progress_tracker, 727 ) 728 except exc.PyAirbyteInternalError as ex: 729 progress_tracker.log_failure(exception=ex) 730 raise exc.AirbyteConnectorFailedError( 731 connector_name=self.name, 732 log_text=self._last_log_messages, 733 ) from ex 734 except Exception as ex: 735 progress_tracker.log_failure(exception=ex) 736 raise 737 738 progress_tracker.log_success() 739 return result 740 741 def _read_to_cache( # noqa: PLR0913 # Too many arguments 742 self, 743 cache: CacheBase, 744 *, 745 catalog_provider: CatalogProvider, 746 stream_names: list[str], 747 state_provider: StateProviderBase | None, 748 state_writer: StateWriterBase | None, 749 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 750 force_full_refresh: bool = False, 751 skip_validation: bool = False, 752 progress_tracker: ProgressTracker, 753 ) -> ReadResult: 754 """Internal read method.""" 755 if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: 756 warnings.warn( 757 message=( 758 "Using `REPLACE` strategy without also setting `full_refresh_mode=True` " 759 "could result in data loss. " 760 "To silence this warning, use the following: " 761 'warnings.filterwarnings("ignore", ' 762 'category="airbyte.warnings.PyAirbyteDataLossWarning")`' 763 ), 764 category=exc.PyAirbyteDataLossWarning, 765 stacklevel=1, 766 ) 767 if isinstance(write_strategy, str): 768 try: 769 write_strategy = WriteStrategy(write_strategy) 770 except ValueError: 771 raise exc.PyAirbyteInputError( 772 message="Invalid strategy", 773 context={ 774 "write_strategy": write_strategy, 775 "available_strategies": [s.value for s in WriteStrategy], 776 }, 777 ) from None 778 779 # Run optional validation step 780 if not skip_validation: 781 self.validate_config() 782 783 # Log incremental stream if incremental streams are known 784 if state_provider and state_provider.known_stream_names: 785 # Retrieve set of the known streams support which support incremental sync 786 incremental_streams = ( 787 set(self._get_incremental_stream_names()) 788 & state_provider.known_stream_names 789 & set(self.get_selected_streams()) 790 ) 791 if incremental_streams: 792 self._log_incremental_streams(incremental_streams=incremental_streams) 793 794 airbyte_message_iterator = AirbyteMessageIterator( 795 self._read_with_catalog( 796 catalog=catalog_provider.configured_catalog, 797 state=state_provider, 798 progress_tracker=progress_tracker, 799 ) 800 ) 801 cache._write_airbyte_message_stream( # noqa: SLF001 # Non-public API 802 stdin=airbyte_message_iterator, 803 catalog_provider=catalog_provider, 804 write_strategy=write_strategy, 805 state_writer=state_writer, 806 progress_tracker=progress_tracker, 807 ) 808 809 # Flush the WAL, if applicable 810 cache.processor._do_checkpoint() # noqa: SLF001 # Non-public API 811 812 return ReadResult( 813 source_name=self.name, 814 progress_tracker=progress_tracker, 815 processed_streams=stream_names, 816 cache=cache, 817 )
A class representing a source that can be called.
56 def __init__( 57 self, 58 executor: Executor, 59 name: str, 60 config: dict[str, Any] | None = None, 61 *, 62 config_change_callback: ConfigChangeCallback | None = None, 63 streams: str | list[str] | None = None, 64 validate: bool = False, 65 cursor_key_overrides: dict[str, str] | None = None, 66 primary_key_overrides: dict[str, str | list[str]] | None = None, 67 ) -> None: 68 """Initialize the source. 69 70 If config is provided, it will be validated against the spec if validate is True. 71 """ 72 self._to_be_selected_streams: list[str] | str = [] 73 """Used to hold selection criteria before catalog is known.""" 74 75 super().__init__( 76 executor=executor, 77 name=name, 78 config=config, 79 config_change_callback=config_change_callback, 80 validate=validate, 81 ) 82 self._config_dict: dict[str, Any] | None = None 83 self._last_log_messages: list[str] = [] 84 self._discovered_catalog: AirbyteCatalog | None = None 85 self._selected_stream_names: list[str] = [] 86 87 self._cursor_key_overrides: dict[str, str] = {} 88 """A mapping of lower-cased stream names to cursor key overrides.""" 89 90 self._primary_key_overrides: dict[str, list[str]] = {} 91 """A mapping of lower-cased stream names to primary key overrides.""" 92 93 if config is not None: 94 self.set_config(config, validate=validate) 95 if streams is not None: 96 self.select_streams(streams) 97 if cursor_key_overrides is not None: 98 self.set_cursor_keys(**cursor_key_overrides) 99 if primary_key_overrides is not None: 100 self.set_primary_keys(**primary_key_overrides)
Initialize the source.
If config is provided, it will be validated against the spec if validate is True.
102 def set_streams(self, streams: list[str]) -> None: 103 """Deprecated. See select_streams().""" 104 warnings.warn( 105 "The 'set_streams' method is deprecated and will be removed in a future version. " 106 "Please use the 'select_streams' method instead.", 107 DeprecationWarning, 108 stacklevel=2, 109 ) 110 self.select_streams(streams)
Deprecated. See select_streams().
112 def set_cursor_key( 113 self, 114 stream_name: str, 115 cursor_key: str, 116 ) -> None: 117 """Set the cursor for a single stream. 118 119 Note: 120 - This does not unset previously set cursors. 121 - The cursor key must be a single field name. 122 - Not all streams support custom cursors. If a stream does not support custom cursors, 123 the override may be ignored. 124 - Stream names are case insensitive, while field names are case sensitive. 125 - Stream names are not validated by PyAirbyte. If the stream name 126 does not exist in the catalog, the override may be ignored. 127 """ 128 self._cursor_key_overrides[stream_name.lower()] = cursor_key
Set the cursor for a single stream.
Note:
- This does not unset previously set cursors.
- The cursor key must be a single field name.
- Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
130 def set_cursor_keys( 131 self, 132 **kwargs: str, 133 ) -> None: 134 """Override the cursor key for one or more streams. 135 136 Usage: 137 source.set_cursor_keys( 138 stream1="cursor1", 139 stream2="cursor2", 140 ) 141 142 Note: 143 - This does not unset previously set cursors. 144 - The cursor key must be a single field name. 145 - Not all streams support custom cursors. If a stream does not support custom cursors, 146 the override may be ignored. 147 - Stream names are case insensitive, while field names are case sensitive. 148 - Stream names are not validated by PyAirbyte. If the stream name 149 does not exist in the catalog, the override may be ignored. 150 """ 151 self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})
Override the cursor key for one or more streams.
Usage:
source.set_cursor_keys( stream1="cursor1", stream2="cursor2", )
Note:
- This does not unset previously set cursors.
- The cursor key must be a single field name.
- Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
153 def set_primary_key( 154 self, 155 stream_name: str, 156 primary_key: str | list[str], 157 ) -> None: 158 """Set the primary key for a single stream. 159 160 Note: 161 - This does not unset previously set primary keys. 162 - The primary key must be a single field name or a list of field names. 163 - Not all streams support overriding primary keys. If a stream does not support overriding 164 primary keys, the override may be ignored. 165 - Stream names are case insensitive, while field names are case sensitive. 166 - Stream names are not validated by PyAirbyte. If the stream name 167 does not exist in the catalog, the override may be ignored. 168 """ 169 self._primary_key_overrides[stream_name.lower()] = ( 170 primary_key if isinstance(primary_key, list) else [primary_key] 171 )
Set the primary key for a single stream.
Note:
- This does not unset previously set primary keys.
- The primary key must be a single field name or a list of field names.
- Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
173 def set_primary_keys( 174 self, 175 **kwargs: str | list[str], 176 ) -> None: 177 """Override the primary keys for one or more streams. 178 179 This does not unset previously set primary keys. 180 181 Usage: 182 source.set_primary_keys( 183 stream1="pk1", 184 stream2=["pk1", "pk2"], 185 ) 186 187 Note: 188 - This does not unset previously set primary keys. 189 - The primary key must be a single field name or a list of field names. 190 - Not all streams support overriding primary keys. If a stream does not support overriding 191 primary keys, the override may be ignored. 192 - Stream names are case insensitive, while field names are case sensitive. 193 - Stream names are not validated by PyAirbyte. If the stream name 194 does not exist in the catalog, the override may be ignored. 195 """ 196 self._primary_key_overrides.update( 197 {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()} 198 )
Override the primary keys for one or more streams.
This does not unset previously set primary keys.
Usage:
source.set_primary_keys( stream1="pk1", stream2=["pk1", "pk2"], )
Note:
- This does not unset previously set primary keys.
- The primary key must be a single field name or a list of field names.
- Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
214 def select_all_streams(self) -> None: 215 """Select all streams. 216 217 This is a more streamlined equivalent to: 218 > source.select_streams(source.get_available_streams()). 219 """ 220 if self._config_dict is None: 221 self._to_be_selected_streams = "*" 222 self._log_warning_preselected_stream(self._to_be_selected_streams) 223 return 224 225 self._selected_stream_names = self.get_available_streams()
Select all streams.
This is a more streamlined equivalent to:
source.select_streams(source.get_available_streams()).
227 def select_streams(self, streams: str | list[str]) -> None: 228 """Select the stream names that should be read from the connector. 229 230 Args: 231 streams: A list of stream names to select. If set to "*", all streams will be selected. 232 233 Currently, if this is not set, all streams will be read. 234 """ 235 if self._config_dict is None: 236 self._to_be_selected_streams = streams 237 self._log_warning_preselected_stream(streams) 238 return 239 240 if streams == "*": 241 self.select_all_streams() 242 return 243 244 if isinstance(streams, str): 245 # If a single stream is provided, convert it to a one-item list 246 streams = [streams] 247 248 available_streams = self.get_available_streams() 249 for stream in streams: 250 if stream not in available_streams: 251 raise exc.AirbyteStreamNotFoundError( 252 stream_name=stream, 253 connector_name=self.name, 254 available_streams=available_streams, 255 ) 256 self._selected_stream_names = streams
Select the stream names that should be read from the connector.
Arguments:
- streams: A list of stream names to select. If set to "*", all streams will be selected.
Currently, if this is not set, all streams will be read.
258 def get_selected_streams(self) -> list[str]: 259 """Get the selected streams. 260 261 If no streams are selected, return an empty list. 262 """ 263 return self._selected_stream_names
Get the selected streams.
If no streams are selected, return an empty list.
265 def set_config( 266 self, 267 config: dict[str, Any], 268 *, 269 validate: bool = True, 270 ) -> None: 271 """Set the config for the connector. 272 273 If validate is True, raise an exception if the config fails validation. 274 275 If validate is False, validation will be deferred until check() or validate_config() 276 is called. 277 """ 278 if validate: 279 self.validate_config(config) 280 281 self._config_dict = config 282 283 if self._to_be_selected_streams: 284 self.select_streams(self._to_be_selected_streams) 285 self._to_be_selected_streams = []
Set the config for the connector.
If validate is True, raise an exception if the config fails validation.
If validate is False, validation will be deferred until check() or validate_config() is called.
305 def get_available_streams(self) -> list[str]: 306 """Get the available streams from the spec.""" 307 return [s.name for s in self.discovered_catalog.streams]
Get the available streams from the spec.
339 @property 340 def config_spec(self) -> dict[str, Any]: 341 """Generate a configuration spec for this connector, as a JSON Schema definition. 342 343 This function generates a JSON Schema dictionary with configuration specs for the 344 current connector, as a dictionary. 345 346 Returns: 347 dict: The JSON Schema configuration spec as a dictionary. 348 """ 349 return self._get_spec(force_refresh=True).connectionSpecification
Generate a configuration spec for this connector, as a JSON Schema definition.
This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.
Returns:
dict: The JSON Schema configuration spec as a dictionary.
366 @property 367 def docs_url(self) -> str: 368 """Get the URL to the connector's documentation.""" 369 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 370 "source-", "" 371 )
Get the URL to the connector's documentation.
373 @property 374 def discovered_catalog(self) -> AirbyteCatalog: 375 """Get the raw catalog for the given streams. 376 377 If the catalog is not yet known, we call discover to get it. 378 """ 379 if self._discovered_catalog is None: 380 self._discovered_catalog = self._discover() 381 382 return self._discovered_catalog
Get the raw catalog for the given streams.
If the catalog is not yet known, we call discover to get it.
384 @property 385 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 386 """Get the configured catalog for the given streams. 387 388 If the raw catalog is not yet known, we call discover to get it. 389 390 If no specific streams are selected, we return a catalog that syncs all available streams. 391 392 TODO: We should consider disabling by default the streams that the connector would 393 disable by default. (For instance, streams that require a premium license are sometimes 394 disabled by default within the connector.) 395 """ 396 # Ensure discovered catalog is cached before we start 397 _ = self.discovered_catalog 398 399 # Filter for selected streams if set, otherwise use all available streams: 400 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 401 return self.get_configured_catalog(streams=streams_filter)
Get the configured catalog for the given streams.
If the raw catalog is not yet known, we call discover to get it.
If no specific streams are selected, we return a catalog that syncs all available streams.
TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)
403 def get_configured_catalog( 404 self, 405 streams: Literal["*"] | list[str] | None = None, 406 ) -> ConfiguredAirbyteCatalog: 407 """Get a configured catalog for the given streams. 408 409 If no streams are provided, the selected streams will be used. If no streams are selected, 410 all available streams will be used. 411 412 If '*' is provided, all available streams will be used. 413 """ 414 selected_streams: list[str] = [] 415 if streams is None: 416 selected_streams = self._selected_stream_names or self.get_available_streams() 417 elif streams == "*": 418 selected_streams = self.get_available_streams() 419 elif isinstance(streams, list): 420 selected_streams = streams 421 else: 422 raise exc.PyAirbyteInputError( 423 message="Invalid streams argument.", 424 input_value=streams, 425 ) 426 427 return ConfiguredAirbyteCatalog( 428 streams=[ 429 ConfiguredAirbyteStream( 430 stream=stream, 431 destination_sync_mode=DestinationSyncMode.overwrite, 432 sync_mode=SyncMode.incremental, 433 primary_key=( 434 [self._primary_key_overrides[stream.name.lower()]] 435 if stream.name.lower() in self._primary_key_overrides 436 else stream.source_defined_primary_key 437 ), 438 cursor_field=( 439 [self._cursor_key_overrides[stream.name.lower()]] 440 if stream.name.lower() in self._cursor_key_overrides 441 else stream.default_cursor_field 442 ), 443 # These are unused in the current implementation: 444 generation_id=None, 445 minimum_generation_id=None, 446 sync_id=None, 447 ) 448 for stream in self.discovered_catalog.streams 449 if stream.name in selected_streams 450 ], 451 )
Get a configured catalog for the given streams.
If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.
If '*' is provided, all available streams will be used.
453 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 454 """Return the JSON Schema spec for the specified stream name.""" 455 catalog: AirbyteCatalog = self.discovered_catalog 456 found: list[AirbyteStream] = [ 457 stream for stream in catalog.streams if stream.name == stream_name 458 ] 459 460 if len(found) == 0: 461 raise exc.PyAirbyteInputError( 462 message="Stream name does not exist in catalog.", 463 input_value=stream_name, 464 ) 465 466 if len(found) > 1: 467 raise exc.PyAirbyteInternalError( 468 message="Duplicate streams found with the same name.", 469 context={ 470 "found_streams": found, 471 }, 472 ) 473 474 return found[0].json_schema
Return the JSON Schema spec for the specified stream name.
476 def get_records( 477 self, 478 stream: str, 479 *, 480 normalize_field_names: bool = False, 481 prune_undeclared_fields: bool = True, 482 ) -> LazyDataset: 483 """Read a stream from the connector. 484 485 Args: 486 stream: The name of the stream to read. 487 normalize_field_names: When `True`, field names will be normalized to lower case, with 488 special characters removed. This matches the behavior of PyAirbyte caches and most 489 Airbyte destinations. 490 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 491 which generally matches the behavior of PyAirbyte caches and most Airbyte 492 destinations, specifically when you expect the catalog may be stale. You can disable 493 this to keep all fields in the records. 494 495 This involves the following steps: 496 * Call discover to get the catalog 497 * Generate a configured catalog that syncs the given stream in full_refresh mode 498 * Write the configured catalog and the config to a temporary file 499 * execute the connector with read --config <config_file> --catalog <catalog_file> 500 * Listen to the messages and return the first AirbyteRecordMessages that come along. 501 * Make sure the subprocess is killed when the function returns. 502 """ 503 configured_catalog = self.get_configured_catalog(streams=[stream]) 504 if len(configured_catalog.streams) == 0: 505 raise exc.PyAirbyteInputError( 506 message="Requested stream does not exist.", 507 context={ 508 "stream": stream, 509 "available_streams": self.get_available_streams(), 510 "connector_name": self.name, 511 }, 512 ) from KeyError(stream) 513 514 configured_stream = configured_catalog.streams[0] 515 516 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 517 yield from records 518 519 stream_record_handler = StreamRecordHandler( 520 json_schema=self.get_stream_json_schema(stream), 521 prune_extra_fields=prune_undeclared_fields, 522 normalize_keys=normalize_field_names, 523 ) 524 525 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 526 progress_tracker = ProgressTracker( 527 ProgressStyle.PLAIN, 528 source=self, 529 cache=None, 530 destination=None, 531 expected_streams=[stream], 532 ) 533 534 iterator: Iterator[dict[str, Any]] = ( 535 StreamRecord.from_record_message( 536 record_message=record.record, 537 stream_record_handler=stream_record_handler, 538 ) 539 for record in self._read_with_catalog( 540 catalog=configured_catalog, 541 progress_tracker=progress_tracker, 542 ) 543 if record.record 544 ) 545 progress_tracker.log_success() 546 return LazyDataset( 547 iterator, 548 stream_metadata=configured_stream, 549 )
Read a stream from the connector.
Arguments:
- stream: The name of the stream to read.
- normalize_field_names: When
True
, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations. - prune_undeclared_fields: When
True
, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.
This involves the following steps:
- Call discover to get the catalog
- Generate a configured catalog that syncs the given stream in full_refresh mode
- Write the configured catalog and the config to a temporary file
- execute the connector with read --config
--catalog - Listen to the messages and return the first AirbyteRecordMessages that come along.
- Make sure the subprocess is killed when the function returns.
551 def get_documents( 552 self, 553 stream: str, 554 title_property: str | None = None, 555 content_properties: list[str] | None = None, 556 metadata_properties: list[str] | None = None, 557 *, 558 render_metadata: bool = False, 559 ) -> Iterable[Document]: 560 """Read a stream from the connector and return the records as documents. 561 562 If metadata_properties is not set, all properties that are not content will be added to 563 the metadata. 564 565 If render_metadata is True, metadata will be rendered in the document, as well as the 566 the main content. 567 """ 568 return self.get_records(stream).to_documents( 569 title_property=title_property, 570 content_properties=content_properties, 571 metadata_properties=metadata_properties, 572 render_metadata=render_metadata, 573 )
Read a stream from the connector and return the records as documents.
If metadata_properties is not set, all properties that are not content will be added to the metadata.
If render_metadata is True, metadata will be rendered in the document, as well as the the main content.
663 def read( 664 self, 665 cache: CacheBase | None = None, 666 *, 667 streams: str | list[str] | None = None, 668 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 669 force_full_refresh: bool = False, 670 skip_validation: bool = False, 671 ) -> ReadResult: 672 """Read from the connector and write to the cache. 673 674 Args: 675 cache: The cache to write to. If not set, a default cache will be used. 676 streams: Optional if already set. A list of stream names to select for reading. If set 677 to "*", all streams will be selected. 678 write_strategy: The strategy to use when writing to the cache. If a string, it must be 679 one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one 680 of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or 681 WriteStrategy.AUTO. 682 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 683 streams will be read in incremental mode if supported by the connector. This option 684 must be True when using the "replace" strategy. 685 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 686 running the connector. This can be helpful in debugging, when you want to send 687 configurations to the connector that otherwise might be rejected by JSON Schema 688 validation rules. 689 """ 690 cache = cache or get_default_cache() 691 progress_tracker = ProgressTracker( 692 source=self, 693 cache=cache, 694 destination=None, 695 expected_streams=None, # Will be set later 696 ) 697 698 # Set up state provider if not in full refresh mode 699 if force_full_refresh: 700 state_provider: StateProviderBase | None = None 701 else: 702 state_provider = cache.get_state_provider( 703 source_name=self._name, 704 ) 705 state_writer = cache.get_state_writer(source_name=self._name) 706 707 if streams: 708 self.select_streams(streams) 709 710 if not self._selected_stream_names: 711 raise exc.PyAirbyteNoStreamsSelectedError( 712 connector_name=self.name, 713 available_streams=self.get_available_streams(), 714 ) 715 716 try: 717 result = self._read_to_cache( 718 cache=cache, 719 catalog_provider=CatalogProvider(self.configured_catalog), 720 stream_names=self._selected_stream_names, 721 state_provider=state_provider, 722 state_writer=state_writer, 723 write_strategy=write_strategy, 724 force_full_refresh=force_full_refresh, 725 skip_validation=skip_validation, 726 progress_tracker=progress_tracker, 727 ) 728 except exc.PyAirbyteInternalError as ex: 729 progress_tracker.log_failure(exception=ex) 730 raise exc.AirbyteConnectorFailedError( 731 connector_name=self.name, 732 log_text=self._last_log_messages, 733 ) from ex 734 except Exception as ex: 735 progress_tracker.log_failure(exception=ex) 736 raise 737 738 progress_tracker.log_success() 739 return result
Read from the connector and write to the cache.
Arguments:
- cache: The cache to write to. If not set, a default cache will be used.
- streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
- write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
- force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
- skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.
Inherited Members
- airbyte._connector_base.ConnectorBase
- config_change_callback
- executor
- name
- get_config
- config_hash
- validate_config
- print_config_spec
- connector_version
- check
- install
- uninstall