airbyte.sources.base
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2from __future__ import annotations 3 4import json 5import warnings 6from pathlib import Path 7from typing import TYPE_CHECKING, Any, cast 8 9import jsonschema 10import pendulum 11import yaml 12from rich import print 13from rich.syntax import Syntax 14from typing_extensions import Literal 15 16from airbyte_protocol.models import ( 17 AirbyteCatalog, 18 AirbyteMessage, 19 ConfiguredAirbyteCatalog, 20 ConfiguredAirbyteStream, 21 ConnectorSpecification, 22 DestinationSyncMode, 23 Status, 24 SyncMode, 25 TraceType, 26 Type, 27) 28 29from airbyte import exceptions as exc 30from airbyte._future_cdk.catalog_providers import CatalogProvider 31from airbyte._util.telemetry import ( 32 EventState, 33 EventType, 34 log_config_validation_result, 35 log_source_check_result, 36 send_telemetry, 37) 38from airbyte._util.temp_files import as_temp_files 39from airbyte.caches.util import get_default_cache 40from airbyte.datasets._lazy import LazyDataset 41from airbyte.progress import progress 42from airbyte.records import StreamRecord 43from airbyte.results import ReadResult 44from airbyte.strategies import WriteStrategy 45from airbyte.warnings import PyAirbyteDataLossWarning 46 47 48if TYPE_CHECKING: 49 from collections.abc import Generator, Iterable, Iterator 50 51 from airbyte_protocol.models.airbyte_protocol import AirbyteStream 52 53 from airbyte._executor import Executor 54 from airbyte._future_cdk.state_providers import StateProviderBase 55 from airbyte.caches import CacheBase 56 from airbyte.documents import Document 57 58 59class Source: # noqa: PLR0904 # Ignore max publish methods 60 """A class representing a source that can be called.""" 61 62 def __init__( 63 self, 64 executor: Executor, 65 name: str, 66 config: dict[str, Any] | None = None, 67 streams: str | list[str] | None = None, 68 *, 69 validate: bool = False, 70 ) -> None: 71 """Initialize the source. 72 73 If config is provided, it will be validated against the spec if validate is True. 74 """ 75 self.executor = executor 76 self.name = name 77 self._processed_records = 0 78 self._config_dict: dict[str, Any] | None = None 79 self._last_log_messages: list[str] = [] 80 self._discovered_catalog: AirbyteCatalog | None = None 81 self._spec: ConnectorSpecification | None = None 82 self._selected_stream_names: list[str] = [] 83 if config is not None: 84 self.set_config(config, validate=validate) 85 if streams is not None: 86 self.select_streams(streams) 87 88 self._deployed_api_root: str | None = None 89 self._deployed_workspace_id: str | None = None 90 self._deployed_source_id: str | None = None 91 92 def set_streams(self, streams: list[str]) -> None: 93 """Deprecated. See select_streams().""" 94 warnings.warn( 95 "The 'set_streams' method is deprecated and will be removed in a future version. " 96 "Please use the 'select_streams' method instead.", 97 DeprecationWarning, 98 stacklevel=2, 99 ) 100 self.select_streams(streams) 101 102 def select_all_streams(self) -> None: 103 """Select all streams. 104 105 This is a more streamlined equivalent to: 106 > source.select_streams(source.get_available_streams()). 107 """ 108 self._selected_stream_names = self.get_available_streams() 109 110 def select_streams(self, streams: str | list[str]) -> None: 111 """Select the stream names that should be read from the connector. 112 113 Args: 114 - streams: A list of stream names to select. If set to "*", all streams will be selected. 115 116 Currently, if this is not set, all streams will be read. 117 """ 118 if streams == "*": 119 self.select_all_streams() 120 return 121 122 if isinstance(streams, str): 123 # If a single stream is provided, convert it to a one-item list 124 streams = [streams] 125 126 available_streams = self.get_available_streams() 127 for stream in streams: 128 if stream not in available_streams: 129 raise exc.AirbyteStreamNotFoundError( 130 stream_name=stream, 131 connector_name=self.name, 132 available_streams=available_streams, 133 ) 134 self._selected_stream_names = streams 135 136 def get_selected_streams(self) -> list[str]: 137 """Get the selected streams. 138 139 If no streams are selected, return an empty list. 140 """ 141 return self._selected_stream_names 142 143 def set_config( 144 self, 145 config: dict[str, Any], 146 *, 147 validate: bool = True, 148 ) -> None: 149 """Set the config for the connector. 150 151 If validate is True, raise an exception if the config fails validation. 152 153 If validate is False, validation will be deferred until check() or validate_config() 154 is called. 155 """ 156 if validate: 157 self.validate_config(config) 158 159 self._config_dict = config 160 161 def get_config(self) -> dict[str, Any]: 162 """Get the config for the connector.""" 163 return self._config 164 165 @property 166 def _config(self) -> dict[str, Any]: 167 if self._config_dict is None: 168 raise exc.AirbyteConnectorConfigurationMissingError( 169 guidance="Provide via get_source() or set_config()" 170 ) 171 return self._config_dict 172 173 def _discover(self) -> AirbyteCatalog: 174 """Call discover on the connector. 175 176 This involves the following steps: 177 * Write the config to a temporary file 178 * execute the connector with discover --config <config_file> 179 * Listen to the messages and return the first AirbyteCatalog that comes along. 180 * Make sure the subprocess is killed when the function returns. 181 """ 182 with as_temp_files([self._config]) as [config_file]: 183 for msg in self._execute(["discover", "--config", config_file]): 184 if msg.type == Type.CATALOG and msg.catalog: 185 return msg.catalog 186 raise exc.AirbyteConnectorMissingCatalogError( 187 log_text=self._last_log_messages, 188 ) 189 190 def validate_config(self, config: dict[str, Any] | None = None) -> None: 191 """Validate the config against the spec. 192 193 If config is not provided, the already-set config will be validated. 194 """ 195 spec = self._get_spec(force_refresh=False) 196 config = self._config if config is None else config 197 try: 198 jsonschema.validate(config, spec.connectionSpecification) 199 log_config_validation_result( 200 name=self.name, 201 state=EventState.SUCCEEDED, 202 ) 203 except jsonschema.ValidationError as ex: 204 validation_ex = exc.AirbyteConnectorValidationFailedError( 205 message="The provided config is not valid.", 206 context={ 207 "error_message": ex.message, 208 "error_path": ex.path, 209 "error_instance": ex.instance, 210 "error_schema": ex.schema, 211 }, 212 ) 213 log_config_validation_result( 214 name=self.name, 215 state=EventState.FAILED, 216 exception=validation_ex, 217 ) 218 raise validation_ex from ex 219 220 def get_available_streams(self) -> list[str]: 221 """Get the available streams from the spec.""" 222 return [s.name for s in self.discovered_catalog.streams] 223 224 def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: 225 """Call spec on the connector. 226 227 This involves the following steps: 228 * execute the connector with spec 229 * Listen to the messages and return the first AirbyteCatalog that comes along. 230 * Make sure the subprocess is killed when the function returns. 231 """ 232 if force_refresh or self._spec is None: 233 for msg in self._execute(["spec"]): 234 if msg.type == Type.SPEC and msg.spec: 235 self._spec = msg.spec 236 break 237 238 if self._spec: 239 return self._spec 240 241 raise exc.AirbyteConnectorMissingSpecError( 242 log_text=self._last_log_messages, 243 ) 244 245 @property 246 def config_spec(self) -> dict[str, Any]: 247 """Generate a configuration spec for this connector, as a JSON Schema definition. 248 249 This function generates a JSON Schema dictionary with configuration specs for the 250 current connector, as a dictionary. 251 252 Returns: 253 dict: The JSON Schema configuration spec as a dictionary. 254 """ 255 return self._get_spec(force_refresh=True).connectionSpecification 256 257 def print_config_spec( 258 self, 259 format: Literal["yaml", "json"] = "yaml", # noqa: A002 260 *, 261 output_file: Path | str | None = None, 262 ) -> None: 263 """Print the configuration spec for this connector. 264 265 Args: 266 - format: The format to print the spec in. Must be "yaml" or "json". 267 - output_file: Optional. If set, the spec will be written to the given file path. Otherwise, 268 it will be printed to the console. 269 """ 270 if format not in {"yaml", "json"}: 271 raise exc.PyAirbyteInputError( 272 message="Invalid format. Expected 'yaml' or 'json'", 273 input_value=format, 274 ) 275 if isinstance(output_file, str): 276 output_file = Path(output_file) 277 278 if format == "yaml": 279 content = yaml.dump(self.config_spec, indent=2) 280 elif format == "json": 281 content = json.dumps(self.config_spec, indent=2) 282 283 if output_file: 284 output_file.write_text(content) 285 return 286 287 syntax_highlighted = Syntax(content, format) 288 print(syntax_highlighted) 289 290 @property 291 def _yaml_spec(self) -> str: 292 """Get the spec as a yaml string. 293 294 For now, the primary use case is for writing and debugging a valid config for a source. 295 296 This is private for now because we probably want better polish before exposing this 297 as a stable interface. This will also get easier when we have docs links with this info 298 for each connector. 299 """ 300 spec_obj: ConnectorSpecification = self._get_spec() 301 spec_dict = spec_obj.dict(exclude_unset=True) 302 # convert to a yaml string 303 return yaml.dump(spec_dict) 304 305 @property 306 def docs_url(self) -> str: 307 """Get the URL to the connector's documentation.""" 308 # TODO: Replace with docs URL from metadata when available 309 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 310 "source-", "" 311 ) 312 313 @property 314 def discovered_catalog(self) -> AirbyteCatalog: 315 """Get the raw catalog for the given streams. 316 317 If the catalog is not yet known, we call discover to get it. 318 """ 319 if self._discovered_catalog is None: 320 self._discovered_catalog = self._discover() 321 322 return self._discovered_catalog 323 324 @property 325 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 326 """Get the configured catalog for the given streams. 327 328 If the raw catalog is not yet known, we call discover to get it. 329 330 If no specific streams are selected, we return a catalog that syncs all available streams. 331 332 TODO: We should consider disabling by default the streams that the connector would 333 disable by default. (For instance, streams that require a premium license are sometimes 334 disabled by default within the connector.) 335 """ 336 # Ensure discovered catalog is cached before we start 337 _ = self.discovered_catalog 338 339 # Filter for selected streams if set, otherwise use all available streams: 340 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 341 342 return ConfiguredAirbyteCatalog( 343 streams=[ 344 ConfiguredAirbyteStream( 345 stream=stream, 346 destination_sync_mode=DestinationSyncMode.overwrite, 347 primary_key=stream.source_defined_primary_key, 348 # TODO: The below assumes all sources can coalesce from incremental sync to 349 # full_table as needed. CDK supports this, so it might be safe: 350 sync_mode=SyncMode.incremental, 351 ) 352 for stream in self.discovered_catalog.streams 353 if stream.name in streams_filter 354 ], 355 ) 356 357 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 358 """Return the JSON Schema spec for the specified stream name.""" 359 catalog: AirbyteCatalog = self.discovered_catalog 360 found: list[AirbyteStream] = [ 361 stream for stream in catalog.streams if stream.name == stream_name 362 ] 363 364 if len(found) == 0: 365 raise exc.PyAirbyteInputError( 366 message="Stream name does not exist in catalog.", 367 input_value=stream_name, 368 ) 369 370 if len(found) > 1: 371 raise exc.PyAirbyteInternalError( 372 message="Duplicate streams found with the same name.", 373 context={ 374 "found_streams": found, 375 }, 376 ) 377 378 return found[0].json_schema 379 380 def get_records(self, stream: str) -> LazyDataset: 381 """Read a stream from the connector. 382 383 This involves the following steps: 384 * Call discover to get the catalog 385 * Generate a configured catalog that syncs the given stream in full_refresh mode 386 * Write the configured catalog and the config to a temporary file 387 * execute the connector with read --config <config_file> --catalog <catalog_file> 388 * Listen to the messages and return the first AirbyteRecordMessages that come along. 389 * Make sure the subprocess is killed when the function returns. 390 """ 391 discovered_catalog: AirbyteCatalog = self.discovered_catalog 392 configured_catalog = ConfiguredAirbyteCatalog( 393 streams=[ 394 ConfiguredAirbyteStream( 395 stream=s, 396 sync_mode=SyncMode.full_refresh, 397 destination_sync_mode=DestinationSyncMode.overwrite, 398 ) 399 for s in discovered_catalog.streams 400 if s.name == stream 401 ], 402 ) 403 if len(configured_catalog.streams) == 0: 404 raise exc.PyAirbyteInputError( 405 message="Requested stream does not exist.", 406 context={ 407 "stream": stream, 408 "available_streams": self.get_available_streams(), 409 "connector_name": self.name, 410 }, 411 ) from KeyError(stream) 412 413 configured_stream = configured_catalog.streams[0] 414 all_properties = cast( 415 list[str], list(configured_stream.stream.json_schema["properties"].keys()) 416 ) 417 418 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 419 self._log_sync_start(cache=None) 420 yield from records 421 self._log_sync_success(cache=None) 422 423 iterator: Iterator[dict[str, Any]] = _with_logging( 424 records=( # Generator comprehension yields StreamRecord objects for each record 425 StreamRecord.from_record_message( 426 record_message=record.record, 427 expected_keys=all_properties, 428 prune_extra_fields=True, 429 ) 430 for record in self._read_with_catalog(configured_catalog) 431 if record.record 432 ) 433 ) 434 return LazyDataset( 435 iterator, 436 stream_metadata=configured_stream, 437 ) 438 439 @property 440 def connector_version(self) -> str | None: 441 """Return the version of the connector as reported by the executor. 442 443 Returns None if the version cannot be determined. 444 """ 445 return self.executor.get_installed_version() 446 447 def get_documents( 448 self, 449 stream: str, 450 title_property: str | None = None, 451 content_properties: list[str] | None = None, 452 metadata_properties: list[str] | None = None, 453 *, 454 render_metadata: bool = False, 455 ) -> Iterable[Document]: 456 """Read a stream from the connector and return the records as documents. 457 458 If metadata_properties is not set, all properties that are not content will be added to 459 the metadata. 460 461 If render_metadata is True, metadata will be rendered in the document, as well as the 462 the main content. 463 """ 464 return self.get_records(stream).to_documents( 465 title_property=title_property, 466 content_properties=content_properties, 467 metadata_properties=metadata_properties, 468 render_metadata=render_metadata, 469 ) 470 471 def check(self) -> None: 472 """Call check on the connector. 473 474 This involves the following steps: 475 * Write the config to a temporary file 476 * execute the connector with check --config <config_file> 477 * Listen to the messages and return the first AirbyteCatalog that comes along. 478 * Make sure the subprocess is killed when the function returns. 479 """ 480 with as_temp_files([self._config]) as [config_file]: 481 try: 482 for msg in self._execute(["check", "--config", config_file]): 483 if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus: 484 if msg.connectionStatus.status != Status.FAILED: 485 print(f"Connection check succeeded for `{self.name}`.") 486 log_source_check_result( 487 name=self.name, 488 state=EventState.SUCCEEDED, 489 ) 490 return 491 492 log_source_check_result( 493 name=self.name, 494 state=EventState.FAILED, 495 ) 496 raise exc.AirbyteConnectorCheckFailedError( 497 help_url=self.docs_url, 498 context={ 499 "failure_reason": msg.connectionStatus.message, 500 }, 501 ) 502 raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages) 503 except exc.AirbyteConnectorReadError as ex: 504 raise exc.AirbyteConnectorCheckFailedError( 505 message="The connector failed to check the connection.", 506 log_text=ex.log_text, 507 ) from ex 508 509 def install(self) -> None: 510 """Install the connector if it is not yet installed.""" 511 self.executor.install() 512 print("For configuration instructions, see: \n" f"{self.docs_url}#reference\n") 513 514 def uninstall(self) -> None: 515 """Uninstall the connector if it is installed. 516 517 This only works if the use_local_install flag wasn't used and installation is managed by 518 PyAirbyte. 519 """ 520 self.executor.uninstall() 521 522 def _read_with_catalog( 523 self, 524 catalog: ConfiguredAirbyteCatalog, 525 state: StateProviderBase | None = None, 526 ) -> Iterator[AirbyteMessage]: 527 """Call read on the connector. 528 529 This involves the following steps: 530 * Write the config to a temporary file 531 * execute the connector with read --config <config_file> --catalog <catalog_file> 532 * Listen to the messages and return the AirbyteRecordMessages that come along. 533 * Send out telemetry on the performed sync (with information about which source was used and 534 the type of the cache) 535 """ 536 self._processed_records = 0 # Reset the counter before we start 537 with as_temp_files( 538 [ 539 self._config, 540 catalog.json(), 541 state.to_state_input_file_text() if state else "[]", 542 ] 543 ) as [ 544 config_file, 545 catalog_file, 546 state_file, 547 ]: 548 yield from self._tally_records( 549 self._execute( 550 [ 551 "read", 552 "--config", 553 config_file, 554 "--catalog", 555 catalog_file, 556 "--state", 557 state_file, 558 ], 559 ) 560 ) 561 562 def _add_to_logs(self, message: str) -> None: 563 self._last_log_messages.append(message) 564 self._last_log_messages = self._last_log_messages[-10:] 565 566 def _execute(self, args: list[str]) -> Iterator[AirbyteMessage]: 567 """Execute the connector with the given arguments. 568 569 This involves the following steps: 570 * Locate the right venv. It is called ".venv-<connector_name>" 571 * Spawn a subprocess with .venv-<connector_name>/bin/<connector-name> <args> 572 * Read the output line by line of the subprocess and serialize them AirbyteMessage objects. 573 Drop if not valid. 574 """ 575 # Fail early if the connector is not installed. 576 self.executor.ensure_installation(auto_fix=False) 577 578 try: 579 self._last_log_messages = [] 580 for line in self.executor.execute(args): 581 try: 582 message = AirbyteMessage.parse_raw(line) 583 if message.type is Type.RECORD: 584 self._processed_records += 1 585 if message.type == Type.LOG: 586 self._add_to_logs(message.log.message) 587 if message.type == Type.TRACE and message.trace.type == TraceType.ERROR: 588 self._add_to_logs(message.trace.error.message) 589 yield message 590 except Exception: 591 self._add_to_logs(line) 592 except Exception as e: 593 raise exc.AirbyteConnectorReadError( 594 log_text=self._last_log_messages, 595 ) from e 596 597 def _tally_records( 598 self, 599 messages: Iterable[AirbyteMessage], 600 ) -> Generator[AirbyteMessage, Any, None]: 601 """This method simply tallies the number of records processed and yields the messages.""" 602 self._processed_records = 0 # Reset the counter before we start 603 progress.reset(len(self._selected_stream_names or [])) 604 605 for message in messages: 606 yield message 607 progress.log_records_read(new_total_count=self._processed_records) 608 609 def _log_sync_start( 610 self, 611 *, 612 cache: CacheBase | None, 613 ) -> None: 614 """Log the start of a sync operation.""" 615 print(f"Started `{self.name}` read operation at {pendulum.now().format('HH:mm:ss')}...") 616 send_telemetry( 617 source=self, 618 cache=cache, 619 state=EventState.STARTED, 620 event_type=EventType.SYNC, 621 ) 622 623 def _log_sync_success( 624 self, 625 *, 626 cache: CacheBase | None, 627 ) -> None: 628 """Log the success of a sync operation.""" 629 print(f"Completed `{self.name}` read operation at {pendulum.now().format('HH:mm:ss')}.") 630 send_telemetry( 631 source=self, 632 cache=cache, 633 state=EventState.SUCCEEDED, 634 number_of_records=self._processed_records, 635 event_type=EventType.SYNC, 636 ) 637 638 def _log_sync_failure( 639 self, 640 *, 641 cache: CacheBase | None, 642 exception: Exception, 643 ) -> None: 644 """Log the failure of a sync operation.""" 645 print(f"Failed `{self.name}` read operation at {pendulum.now().format('HH:mm:ss')}.") 646 send_telemetry( 647 state=EventState.FAILED, 648 source=self, 649 cache=cache, 650 number_of_records=self._processed_records, 651 exception=exception, 652 event_type=EventType.SYNC, 653 ) 654 655 def read( 656 self, 657 cache: CacheBase | None = None, 658 *, 659 streams: str | list[str] | None = None, 660 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 661 force_full_refresh: bool = False, 662 skip_validation: bool = False, 663 ) -> ReadResult: 664 """Read from the connector and write to the cache. 665 666 Args: 667 cache: The cache to write to. If None, a default cache will be used. 668 write_strategy: The strategy to use when writing to the cache. If a string, it must be 669 one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one 670 of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or 671 WriteStrategy.AUTO. 672 streams: Optional if already set. A list of stream names to select for reading. If set 673 to "*", all streams will be selected. 674 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 675 streams will be read in incremental mode if supported by the connector. This option 676 must be True when using the "replace" strategy. 677 """ 678 if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: 679 warnings.warn( 680 message=( 681 "Using `REPLACE` strategy without also setting `full_refresh_mode=True` " 682 "could result in data loss. " 683 "To silence this warning, use the following: " 684 'warnings.filterwarnings("ignore", ' 685 'category="airbyte.warnings.PyAirbyteDataLossWarning")`' 686 ), 687 category=PyAirbyteDataLossWarning, 688 stacklevel=1, 689 ) 690 if isinstance(write_strategy, str): 691 try: 692 write_strategy = WriteStrategy(write_strategy) 693 except ValueError: 694 raise exc.PyAirbyteInputError( 695 message="Invalid strategy", 696 context={ 697 "write_strategy": write_strategy, 698 "available_strategies": [s.value for s in WriteStrategy], 699 }, 700 ) from None 701 702 if streams: 703 self.select_streams(streams) 704 705 if not self._selected_stream_names: 706 raise exc.PyAirbyteNoStreamsSelectedError( 707 connector_name=self.name, 708 available_streams=self.get_available_streams(), 709 ) 710 711 # Run optional validation step 712 if not skip_validation: 713 self.validate_config() 714 715 # Set up cache and related resources 716 if cache is None: 717 cache = get_default_cache() 718 719 # Set up state provider if not in full refresh mode 720 if force_full_refresh: 721 state_provider: StateProviderBase | None = None 722 else: 723 state_provider = cache.get_state_provider( 724 source_name=self.name, 725 ) 726 727 self._log_sync_start(cache=cache) 728 729 cache_processor = cache.get_record_processor( 730 source_name=self.name, 731 catalog_provider=CatalogProvider(self.configured_catalog), 732 ) 733 try: 734 cache_processor.process_airbyte_messages( 735 self._read_with_catalog( 736 catalog=self.configured_catalog, 737 state=state_provider, 738 ), 739 write_strategy=write_strategy, 740 ) 741 742 # TODO: We should catch more specific exceptions here 743 except Exception as ex: 744 self._log_sync_failure(cache=cache, exception=ex) 745 raise exc.AirbyteConnectorFailedError( 746 log_text=self._last_log_messages, 747 ) from ex 748 749 self._log_sync_success(cache=cache) 750 return ReadResult( 751 processed_records=self._processed_records, 752 cache=cache, 753 processed_streams=[stream.stream.name for stream in self.configured_catalog.streams], 754 ) 755 756 757__all__ = [ 758 "Source", 759]
60class Source: # noqa: PLR0904 # Ignore max publish methods 61 """A class representing a source that can be called.""" 62 63 def __init__( 64 self, 65 executor: Executor, 66 name: str, 67 config: dict[str, Any] | None = None, 68 streams: str | list[str] | None = None, 69 *, 70 validate: bool = False, 71 ) -> None: 72 """Initialize the source. 73 74 If config is provided, it will be validated against the spec if validate is True. 75 """ 76 self.executor = executor 77 self.name = name 78 self._processed_records = 0 79 self._config_dict: dict[str, Any] | None = None 80 self._last_log_messages: list[str] = [] 81 self._discovered_catalog: AirbyteCatalog | None = None 82 self._spec: ConnectorSpecification | None = None 83 self._selected_stream_names: list[str] = [] 84 if config is not None: 85 self.set_config(config, validate=validate) 86 if streams is not None: 87 self.select_streams(streams) 88 89 self._deployed_api_root: str | None = None 90 self._deployed_workspace_id: str | None = None 91 self._deployed_source_id: str | None = None 92 93 def set_streams(self, streams: list[str]) -> None: 94 """Deprecated. See select_streams().""" 95 warnings.warn( 96 "The 'set_streams' method is deprecated and will be removed in a future version. " 97 "Please use the 'select_streams' method instead.", 98 DeprecationWarning, 99 stacklevel=2, 100 ) 101 self.select_streams(streams) 102 103 def select_all_streams(self) -> None: 104 """Select all streams. 105 106 This is a more streamlined equivalent to: 107 > source.select_streams(source.get_available_streams()). 108 """ 109 self._selected_stream_names = self.get_available_streams() 110 111 def select_streams(self, streams: str | list[str]) -> None: 112 """Select the stream names that should be read from the connector. 113 114 Args: 115 - streams: A list of stream names to select. If set to "*", all streams will be selected. 116 117 Currently, if this is not set, all streams will be read. 118 """ 119 if streams == "*": 120 self.select_all_streams() 121 return 122 123 if isinstance(streams, str): 124 # If a single stream is provided, convert it to a one-item list 125 streams = [streams] 126 127 available_streams = self.get_available_streams() 128 for stream in streams: 129 if stream not in available_streams: 130 raise exc.AirbyteStreamNotFoundError( 131 stream_name=stream, 132 connector_name=self.name, 133 available_streams=available_streams, 134 ) 135 self._selected_stream_names = streams 136 137 def get_selected_streams(self) -> list[str]: 138 """Get the selected streams. 139 140 If no streams are selected, return an empty list. 141 """ 142 return self._selected_stream_names 143 144 def set_config( 145 self, 146 config: dict[str, Any], 147 *, 148 validate: bool = True, 149 ) -> None: 150 """Set the config for the connector. 151 152 If validate is True, raise an exception if the config fails validation. 153 154 If validate is False, validation will be deferred until check() or validate_config() 155 is called. 156 """ 157 if validate: 158 self.validate_config(config) 159 160 self._config_dict = config 161 162 def get_config(self) -> dict[str, Any]: 163 """Get the config for the connector.""" 164 return self._config 165 166 @property 167 def _config(self) -> dict[str, Any]: 168 if self._config_dict is None: 169 raise exc.AirbyteConnectorConfigurationMissingError( 170 guidance="Provide via get_source() or set_config()" 171 ) 172 return self._config_dict 173 174 def _discover(self) -> AirbyteCatalog: 175 """Call discover on the connector. 176 177 This involves the following steps: 178 * Write the config to a temporary file 179 * execute the connector with discover --config <config_file> 180 * Listen to the messages and return the first AirbyteCatalog that comes along. 181 * Make sure the subprocess is killed when the function returns. 182 """ 183 with as_temp_files([self._config]) as [config_file]: 184 for msg in self._execute(["discover", "--config", config_file]): 185 if msg.type == Type.CATALOG and msg.catalog: 186 return msg.catalog 187 raise exc.AirbyteConnectorMissingCatalogError( 188 log_text=self._last_log_messages, 189 ) 190 191 def validate_config(self, config: dict[str, Any] | None = None) -> None: 192 """Validate the config against the spec. 193 194 If config is not provided, the already-set config will be validated. 195 """ 196 spec = self._get_spec(force_refresh=False) 197 config = self._config if config is None else config 198 try: 199 jsonschema.validate(config, spec.connectionSpecification) 200 log_config_validation_result( 201 name=self.name, 202 state=EventState.SUCCEEDED, 203 ) 204 except jsonschema.ValidationError as ex: 205 validation_ex = exc.AirbyteConnectorValidationFailedError( 206 message="The provided config is not valid.", 207 context={ 208 "error_message": ex.message, 209 "error_path": ex.path, 210 "error_instance": ex.instance, 211 "error_schema": ex.schema, 212 }, 213 ) 214 log_config_validation_result( 215 name=self.name, 216 state=EventState.FAILED, 217 exception=validation_ex, 218 ) 219 raise validation_ex from ex 220 221 def get_available_streams(self) -> list[str]: 222 """Get the available streams from the spec.""" 223 return [s.name for s in self.discovered_catalog.streams] 224 225 def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: 226 """Call spec on the connector. 227 228 This involves the following steps: 229 * execute the connector with spec 230 * Listen to the messages and return the first AirbyteCatalog that comes along. 231 * Make sure the subprocess is killed when the function returns. 232 """ 233 if force_refresh or self._spec is None: 234 for msg in self._execute(["spec"]): 235 if msg.type == Type.SPEC and msg.spec: 236 self._spec = msg.spec 237 break 238 239 if self._spec: 240 return self._spec 241 242 raise exc.AirbyteConnectorMissingSpecError( 243 log_text=self._last_log_messages, 244 ) 245 246 @property 247 def config_spec(self) -> dict[str, Any]: 248 """Generate a configuration spec for this connector, as a JSON Schema definition. 249 250 This function generates a JSON Schema dictionary with configuration specs for the 251 current connector, as a dictionary. 252 253 Returns: 254 dict: The JSON Schema configuration spec as a dictionary. 255 """ 256 return self._get_spec(force_refresh=True).connectionSpecification 257 258 def print_config_spec( 259 self, 260 format: Literal["yaml", "json"] = "yaml", # noqa: A002 261 *, 262 output_file: Path | str | None = None, 263 ) -> None: 264 """Print the configuration spec for this connector. 265 266 Args: 267 - format: The format to print the spec in. Must be "yaml" or "json". 268 - output_file: Optional. If set, the spec will be written to the given file path. Otherwise, 269 it will be printed to the console. 270 """ 271 if format not in {"yaml", "json"}: 272 raise exc.PyAirbyteInputError( 273 message="Invalid format. Expected 'yaml' or 'json'", 274 input_value=format, 275 ) 276 if isinstance(output_file, str): 277 output_file = Path(output_file) 278 279 if format == "yaml": 280 content = yaml.dump(self.config_spec, indent=2) 281 elif format == "json": 282 content = json.dumps(self.config_spec, indent=2) 283 284 if output_file: 285 output_file.write_text(content) 286 return 287 288 syntax_highlighted = Syntax(content, format) 289 print(syntax_highlighted) 290 291 @property 292 def _yaml_spec(self) -> str: 293 """Get the spec as a yaml string. 294 295 For now, the primary use case is for writing and debugging a valid config for a source. 296 297 This is private for now because we probably want better polish before exposing this 298 as a stable interface. This will also get easier when we have docs links with this info 299 for each connector. 300 """ 301 spec_obj: ConnectorSpecification = self._get_spec() 302 spec_dict = spec_obj.dict(exclude_unset=True) 303 # convert to a yaml string 304 return yaml.dump(spec_dict) 305 306 @property 307 def docs_url(self) -> str: 308 """Get the URL to the connector's documentation.""" 309 # TODO: Replace with docs URL from metadata when available 310 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 311 "source-", "" 312 ) 313 314 @property 315 def discovered_catalog(self) -> AirbyteCatalog: 316 """Get the raw catalog for the given streams. 317 318 If the catalog is not yet known, we call discover to get it. 319 """ 320 if self._discovered_catalog is None: 321 self._discovered_catalog = self._discover() 322 323 return self._discovered_catalog 324 325 @property 326 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 327 """Get the configured catalog for the given streams. 328 329 If the raw catalog is not yet known, we call discover to get it. 330 331 If no specific streams are selected, we return a catalog that syncs all available streams. 332 333 TODO: We should consider disabling by default the streams that the connector would 334 disable by default. (For instance, streams that require a premium license are sometimes 335 disabled by default within the connector.) 336 """ 337 # Ensure discovered catalog is cached before we start 338 _ = self.discovered_catalog 339 340 # Filter for selected streams if set, otherwise use all available streams: 341 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 342 343 return ConfiguredAirbyteCatalog( 344 streams=[ 345 ConfiguredAirbyteStream( 346 stream=stream, 347 destination_sync_mode=DestinationSyncMode.overwrite, 348 primary_key=stream.source_defined_primary_key, 349 # TODO: The below assumes all sources can coalesce from incremental sync to 350 # full_table as needed. CDK supports this, so it might be safe: 351 sync_mode=SyncMode.incremental, 352 ) 353 for stream in self.discovered_catalog.streams 354 if stream.name in streams_filter 355 ], 356 ) 357 358 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 359 """Return the JSON Schema spec for the specified stream name.""" 360 catalog: AirbyteCatalog = self.discovered_catalog 361 found: list[AirbyteStream] = [ 362 stream for stream in catalog.streams if stream.name == stream_name 363 ] 364 365 if len(found) == 0: 366 raise exc.PyAirbyteInputError( 367 message="Stream name does not exist in catalog.", 368 input_value=stream_name, 369 ) 370 371 if len(found) > 1: 372 raise exc.PyAirbyteInternalError( 373 message="Duplicate streams found with the same name.", 374 context={ 375 "found_streams": found, 376 }, 377 ) 378 379 return found[0].json_schema 380 381 def get_records(self, stream: str) -> LazyDataset: 382 """Read a stream from the connector. 383 384 This involves the following steps: 385 * Call discover to get the catalog 386 * Generate a configured catalog that syncs the given stream in full_refresh mode 387 * Write the configured catalog and the config to a temporary file 388 * execute the connector with read --config <config_file> --catalog <catalog_file> 389 * Listen to the messages and return the first AirbyteRecordMessages that come along. 390 * Make sure the subprocess is killed when the function returns. 391 """ 392 discovered_catalog: AirbyteCatalog = self.discovered_catalog 393 configured_catalog = ConfiguredAirbyteCatalog( 394 streams=[ 395 ConfiguredAirbyteStream( 396 stream=s, 397 sync_mode=SyncMode.full_refresh, 398 destination_sync_mode=DestinationSyncMode.overwrite, 399 ) 400 for s in discovered_catalog.streams 401 if s.name == stream 402 ], 403 ) 404 if len(configured_catalog.streams) == 0: 405 raise exc.PyAirbyteInputError( 406 message="Requested stream does not exist.", 407 context={ 408 "stream": stream, 409 "available_streams": self.get_available_streams(), 410 "connector_name": self.name, 411 }, 412 ) from KeyError(stream) 413 414 configured_stream = configured_catalog.streams[0] 415 all_properties = cast( 416 list[str], list(configured_stream.stream.json_schema["properties"].keys()) 417 ) 418 419 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 420 self._log_sync_start(cache=None) 421 yield from records 422 self._log_sync_success(cache=None) 423 424 iterator: Iterator[dict[str, Any]] = _with_logging( 425 records=( # Generator comprehension yields StreamRecord objects for each record 426 StreamRecord.from_record_message( 427 record_message=record.record, 428 expected_keys=all_properties, 429 prune_extra_fields=True, 430 ) 431 for record in self._read_with_catalog(configured_catalog) 432 if record.record 433 ) 434 ) 435 return LazyDataset( 436 iterator, 437 stream_metadata=configured_stream, 438 ) 439 440 @property 441 def connector_version(self) -> str | None: 442 """Return the version of the connector as reported by the executor. 443 444 Returns None if the version cannot be determined. 445 """ 446 return self.executor.get_installed_version() 447 448 def get_documents( 449 self, 450 stream: str, 451 title_property: str | None = None, 452 content_properties: list[str] | None = None, 453 metadata_properties: list[str] | None = None, 454 *, 455 render_metadata: bool = False, 456 ) -> Iterable[Document]: 457 """Read a stream from the connector and return the records as documents. 458 459 If metadata_properties is not set, all properties that are not content will be added to 460 the metadata. 461 462 If render_metadata is True, metadata will be rendered in the document, as well as the 463 the main content. 464 """ 465 return self.get_records(stream).to_documents( 466 title_property=title_property, 467 content_properties=content_properties, 468 metadata_properties=metadata_properties, 469 render_metadata=render_metadata, 470 ) 471 472 def check(self) -> None: 473 """Call check on the connector. 474 475 This involves the following steps: 476 * Write the config to a temporary file 477 * execute the connector with check --config <config_file> 478 * Listen to the messages and return the first AirbyteCatalog that comes along. 479 * Make sure the subprocess is killed when the function returns. 480 """ 481 with as_temp_files([self._config]) as [config_file]: 482 try: 483 for msg in self._execute(["check", "--config", config_file]): 484 if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus: 485 if msg.connectionStatus.status != Status.FAILED: 486 print(f"Connection check succeeded for `{self.name}`.") 487 log_source_check_result( 488 name=self.name, 489 state=EventState.SUCCEEDED, 490 ) 491 return 492 493 log_source_check_result( 494 name=self.name, 495 state=EventState.FAILED, 496 ) 497 raise exc.AirbyteConnectorCheckFailedError( 498 help_url=self.docs_url, 499 context={ 500 "failure_reason": msg.connectionStatus.message, 501 }, 502 ) 503 raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages) 504 except exc.AirbyteConnectorReadError as ex: 505 raise exc.AirbyteConnectorCheckFailedError( 506 message="The connector failed to check the connection.", 507 log_text=ex.log_text, 508 ) from ex 509 510 def install(self) -> None: 511 """Install the connector if it is not yet installed.""" 512 self.executor.install() 513 print("For configuration instructions, see: \n" f"{self.docs_url}#reference\n") 514 515 def uninstall(self) -> None: 516 """Uninstall the connector if it is installed. 517 518 This only works if the use_local_install flag wasn't used and installation is managed by 519 PyAirbyte. 520 """ 521 self.executor.uninstall() 522 523 def _read_with_catalog( 524 self, 525 catalog: ConfiguredAirbyteCatalog, 526 state: StateProviderBase | None = None, 527 ) -> Iterator[AirbyteMessage]: 528 """Call read on the connector. 529 530 This involves the following steps: 531 * Write the config to a temporary file 532 * execute the connector with read --config <config_file> --catalog <catalog_file> 533 * Listen to the messages and return the AirbyteRecordMessages that come along. 534 * Send out telemetry on the performed sync (with information about which source was used and 535 the type of the cache) 536 """ 537 self._processed_records = 0 # Reset the counter before we start 538 with as_temp_files( 539 [ 540 self._config, 541 catalog.json(), 542 state.to_state_input_file_text() if state else "[]", 543 ] 544 ) as [ 545 config_file, 546 catalog_file, 547 state_file, 548 ]: 549 yield from self._tally_records( 550 self._execute( 551 [ 552 "read", 553 "--config", 554 config_file, 555 "--catalog", 556 catalog_file, 557 "--state", 558 state_file, 559 ], 560 ) 561 ) 562 563 def _add_to_logs(self, message: str) -> None: 564 self._last_log_messages.append(message) 565 self._last_log_messages = self._last_log_messages[-10:] 566 567 def _execute(self, args: list[str]) -> Iterator[AirbyteMessage]: 568 """Execute the connector with the given arguments. 569 570 This involves the following steps: 571 * Locate the right venv. It is called ".venv-<connector_name>" 572 * Spawn a subprocess with .venv-<connector_name>/bin/<connector-name> <args> 573 * Read the output line by line of the subprocess and serialize them AirbyteMessage objects. 574 Drop if not valid. 575 """ 576 # Fail early if the connector is not installed. 577 self.executor.ensure_installation(auto_fix=False) 578 579 try: 580 self._last_log_messages = [] 581 for line in self.executor.execute(args): 582 try: 583 message = AirbyteMessage.parse_raw(line) 584 if message.type is Type.RECORD: 585 self._processed_records += 1 586 if message.type == Type.LOG: 587 self._add_to_logs(message.log.message) 588 if message.type == Type.TRACE and message.trace.type == TraceType.ERROR: 589 self._add_to_logs(message.trace.error.message) 590 yield message 591 except Exception: 592 self._add_to_logs(line) 593 except Exception as e: 594 raise exc.AirbyteConnectorReadError( 595 log_text=self._last_log_messages, 596 ) from e 597 598 def _tally_records( 599 self, 600 messages: Iterable[AirbyteMessage], 601 ) -> Generator[AirbyteMessage, Any, None]: 602 """This method simply tallies the number of records processed and yields the messages.""" 603 self._processed_records = 0 # Reset the counter before we start 604 progress.reset(len(self._selected_stream_names or [])) 605 606 for message in messages: 607 yield message 608 progress.log_records_read(new_total_count=self._processed_records) 609 610 def _log_sync_start( 611 self, 612 *, 613 cache: CacheBase | None, 614 ) -> None: 615 """Log the start of a sync operation.""" 616 print(f"Started `{self.name}` read operation at {pendulum.now().format('HH:mm:ss')}...") 617 send_telemetry( 618 source=self, 619 cache=cache, 620 state=EventState.STARTED, 621 event_type=EventType.SYNC, 622 ) 623 624 def _log_sync_success( 625 self, 626 *, 627 cache: CacheBase | None, 628 ) -> None: 629 """Log the success of a sync operation.""" 630 print(f"Completed `{self.name}` read operation at {pendulum.now().format('HH:mm:ss')}.") 631 send_telemetry( 632 source=self, 633 cache=cache, 634 state=EventState.SUCCEEDED, 635 number_of_records=self._processed_records, 636 event_type=EventType.SYNC, 637 ) 638 639 def _log_sync_failure( 640 self, 641 *, 642 cache: CacheBase | None, 643 exception: Exception, 644 ) -> None: 645 """Log the failure of a sync operation.""" 646 print(f"Failed `{self.name}` read operation at {pendulum.now().format('HH:mm:ss')}.") 647 send_telemetry( 648 state=EventState.FAILED, 649 source=self, 650 cache=cache, 651 number_of_records=self._processed_records, 652 exception=exception, 653 event_type=EventType.SYNC, 654 ) 655 656 def read( 657 self, 658 cache: CacheBase | None = None, 659 *, 660 streams: str | list[str] | None = None, 661 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 662 force_full_refresh: bool = False, 663 skip_validation: bool = False, 664 ) -> ReadResult: 665 """Read from the connector and write to the cache. 666 667 Args: 668 cache: The cache to write to. If None, a default cache will be used. 669 write_strategy: The strategy to use when writing to the cache. If a string, it must be 670 one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one 671 of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or 672 WriteStrategy.AUTO. 673 streams: Optional if already set. A list of stream names to select for reading. If set 674 to "*", all streams will be selected. 675 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 676 streams will be read in incremental mode if supported by the connector. This option 677 must be True when using the "replace" strategy. 678 """ 679 if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: 680 warnings.warn( 681 message=( 682 "Using `REPLACE` strategy without also setting `full_refresh_mode=True` " 683 "could result in data loss. " 684 "To silence this warning, use the following: " 685 'warnings.filterwarnings("ignore", ' 686 'category="airbyte.warnings.PyAirbyteDataLossWarning")`' 687 ), 688 category=PyAirbyteDataLossWarning, 689 stacklevel=1, 690 ) 691 if isinstance(write_strategy, str): 692 try: 693 write_strategy = WriteStrategy(write_strategy) 694 except ValueError: 695 raise exc.PyAirbyteInputError( 696 message="Invalid strategy", 697 context={ 698 "write_strategy": write_strategy, 699 "available_strategies": [s.value for s in WriteStrategy], 700 }, 701 ) from None 702 703 if streams: 704 self.select_streams(streams) 705 706 if not self._selected_stream_names: 707 raise exc.PyAirbyteNoStreamsSelectedError( 708 connector_name=self.name, 709 available_streams=self.get_available_streams(), 710 ) 711 712 # Run optional validation step 713 if not skip_validation: 714 self.validate_config() 715 716 # Set up cache and related resources 717 if cache is None: 718 cache = get_default_cache() 719 720 # Set up state provider if not in full refresh mode 721 if force_full_refresh: 722 state_provider: StateProviderBase | None = None 723 else: 724 state_provider = cache.get_state_provider( 725 source_name=self.name, 726 ) 727 728 self._log_sync_start(cache=cache) 729 730 cache_processor = cache.get_record_processor( 731 source_name=self.name, 732 catalog_provider=CatalogProvider(self.configured_catalog), 733 ) 734 try: 735 cache_processor.process_airbyte_messages( 736 self._read_with_catalog( 737 catalog=self.configured_catalog, 738 state=state_provider, 739 ), 740 write_strategy=write_strategy, 741 ) 742 743 # TODO: We should catch more specific exceptions here 744 except Exception as ex: 745 self._log_sync_failure(cache=cache, exception=ex) 746 raise exc.AirbyteConnectorFailedError( 747 log_text=self._last_log_messages, 748 ) from ex 749 750 self._log_sync_success(cache=cache) 751 return ReadResult( 752 processed_records=self._processed_records, 753 cache=cache, 754 processed_streams=[stream.stream.name for stream in self.configured_catalog.streams], 755 )
A class representing a source that can be called.
63 def __init__( 64 self, 65 executor: Executor, 66 name: str, 67 config: dict[str, Any] | None = None, 68 streams: str | list[str] | None = None, 69 *, 70 validate: bool = False, 71 ) -> None: 72 """Initialize the source. 73 74 If config is provided, it will be validated against the spec if validate is True. 75 """ 76 self.executor = executor 77 self.name = name 78 self._processed_records = 0 79 self._config_dict: dict[str, Any] | None = None 80 self._last_log_messages: list[str] = [] 81 self._discovered_catalog: AirbyteCatalog | None = None 82 self._spec: ConnectorSpecification | None = None 83 self._selected_stream_names: list[str] = [] 84 if config is not None: 85 self.set_config(config, validate=validate) 86 if streams is not None: 87 self.select_streams(streams) 88 89 self._deployed_api_root: str | None = None 90 self._deployed_workspace_id: str | None = None 91 self._deployed_source_id: str | None = None
Initialize the source.
If config is provided, it will be validated against the spec if validate is True.
93 def set_streams(self, streams: list[str]) -> None: 94 """Deprecated. See select_streams().""" 95 warnings.warn( 96 "The 'set_streams' method is deprecated and will be removed in a future version. " 97 "Please use the 'select_streams' method instead.", 98 DeprecationWarning, 99 stacklevel=2, 100 ) 101 self.select_streams(streams)
Deprecated. See select_streams().
103 def select_all_streams(self) -> None: 104 """Select all streams. 105 106 This is a more streamlined equivalent to: 107 > source.select_streams(source.get_available_streams()). 108 """ 109 self._selected_stream_names = self.get_available_streams()
Select all streams.
This is a more streamlined equivalent to:
source.select_streams(source.get_available_streams()).
111 def select_streams(self, streams: str | list[str]) -> None: 112 """Select the stream names that should be read from the connector. 113 114 Args: 115 - streams: A list of stream names to select. If set to "*", all streams will be selected. 116 117 Currently, if this is not set, all streams will be read. 118 """ 119 if streams == "*": 120 self.select_all_streams() 121 return 122 123 if isinstance(streams, str): 124 # If a single stream is provided, convert it to a one-item list 125 streams = [streams] 126 127 available_streams = self.get_available_streams() 128 for stream in streams: 129 if stream not in available_streams: 130 raise exc.AirbyteStreamNotFoundError( 131 stream_name=stream, 132 connector_name=self.name, 133 available_streams=available_streams, 134 ) 135 self._selected_stream_names = streams
Select the stream names that should be read from the connector.
Args:
- streams: A list of stream names to select. If set to "*", all streams will be selected.
Currently, if this is not set, all streams will be read.
137 def get_selected_streams(self) -> list[str]: 138 """Get the selected streams. 139 140 If no streams are selected, return an empty list. 141 """ 142 return self._selected_stream_names
Get the selected streams.
If no streams are selected, return an empty list.
144 def set_config( 145 self, 146 config: dict[str, Any], 147 *, 148 validate: bool = True, 149 ) -> None: 150 """Set the config for the connector. 151 152 If validate is True, raise an exception if the config fails validation. 153 154 If validate is False, validation will be deferred until check() or validate_config() 155 is called. 156 """ 157 if validate: 158 self.validate_config(config) 159 160 self._config_dict = config
Set the config for the connector.
If validate is True, raise an exception if the config fails validation.
If validate is False, validation will be deferred until check() or validate_config() is called.
162 def get_config(self) -> dict[str, Any]: 163 """Get the config for the connector.""" 164 return self._config
Get the config for the connector.
191 def validate_config(self, config: dict[str, Any] | None = None) -> None: 192 """Validate the config against the spec. 193 194 If config is not provided, the already-set config will be validated. 195 """ 196 spec = self._get_spec(force_refresh=False) 197 config = self._config if config is None else config 198 try: 199 jsonschema.validate(config, spec.connectionSpecification) 200 log_config_validation_result( 201 name=self.name, 202 state=EventState.SUCCEEDED, 203 ) 204 except jsonschema.ValidationError as ex: 205 validation_ex = exc.AirbyteConnectorValidationFailedError( 206 message="The provided config is not valid.", 207 context={ 208 "error_message": ex.message, 209 "error_path": ex.path, 210 "error_instance": ex.instance, 211 "error_schema": ex.schema, 212 }, 213 ) 214 log_config_validation_result( 215 name=self.name, 216 state=EventState.FAILED, 217 exception=validation_ex, 218 ) 219 raise validation_ex from ex
Validate the config against the spec.
If config is not provided, the already-set config will be validated.
221 def get_available_streams(self) -> list[str]: 222 """Get the available streams from the spec.""" 223 return [s.name for s in self.discovered_catalog.streams]
Get the available streams from the spec.
246 @property 247 def config_spec(self) -> dict[str, Any]: 248 """Generate a configuration spec for this connector, as a JSON Schema definition. 249 250 This function generates a JSON Schema dictionary with configuration specs for the 251 current connector, as a dictionary. 252 253 Returns: 254 dict: The JSON Schema configuration spec as a dictionary. 255 """ 256 return self._get_spec(force_refresh=True).connectionSpecification
Generate a configuration spec for this connector, as a JSON Schema definition.
This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.
Returns:
dict: The JSON Schema configuration spec as a dictionary.
258 def print_config_spec( 259 self, 260 format: Literal["yaml", "json"] = "yaml", # noqa: A002 261 *, 262 output_file: Path | str | None = None, 263 ) -> None: 264 """Print the configuration spec for this connector. 265 266 Args: 267 - format: The format to print the spec in. Must be "yaml" or "json". 268 - output_file: Optional. If set, the spec will be written to the given file path. Otherwise, 269 it will be printed to the console. 270 """ 271 if format not in {"yaml", "json"}: 272 raise exc.PyAirbyteInputError( 273 message="Invalid format. Expected 'yaml' or 'json'", 274 input_value=format, 275 ) 276 if isinstance(output_file, str): 277 output_file = Path(output_file) 278 279 if format == "yaml": 280 content = yaml.dump(self.config_spec, indent=2) 281 elif format == "json": 282 content = json.dumps(self.config_spec, indent=2) 283 284 if output_file: 285 output_file.write_text(content) 286 return 287 288 syntax_highlighted = Syntax(content, format) 289 print(syntax_highlighted)
Print the configuration spec for this connector.
Args:
- format: The format to print the spec in. Must be "yaml" or "json".
- output_file: Optional. If set, the spec will be written to the given file path. Otherwise, it will be printed to the console.
306 @property 307 def docs_url(self) -> str: 308 """Get the URL to the connector's documentation.""" 309 # TODO: Replace with docs URL from metadata when available 310 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 311 "source-", "" 312 )
Get the URL to the connector's documentation.
314 @property 315 def discovered_catalog(self) -> AirbyteCatalog: 316 """Get the raw catalog for the given streams. 317 318 If the catalog is not yet known, we call discover to get it. 319 """ 320 if self._discovered_catalog is None: 321 self._discovered_catalog = self._discover() 322 323 return self._discovered_catalog
Get the raw catalog for the given streams.
If the catalog is not yet known, we call discover to get it.
325 @property 326 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 327 """Get the configured catalog for the given streams. 328 329 If the raw catalog is not yet known, we call discover to get it. 330 331 If no specific streams are selected, we return a catalog that syncs all available streams. 332 333 TODO: We should consider disabling by default the streams that the connector would 334 disable by default. (For instance, streams that require a premium license are sometimes 335 disabled by default within the connector.) 336 """ 337 # Ensure discovered catalog is cached before we start 338 _ = self.discovered_catalog 339 340 # Filter for selected streams if set, otherwise use all available streams: 341 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 342 343 return ConfiguredAirbyteCatalog( 344 streams=[ 345 ConfiguredAirbyteStream( 346 stream=stream, 347 destination_sync_mode=DestinationSyncMode.overwrite, 348 primary_key=stream.source_defined_primary_key, 349 # TODO: The below assumes all sources can coalesce from incremental sync to 350 # full_table as needed. CDK supports this, so it might be safe: 351 sync_mode=SyncMode.incremental, 352 ) 353 for stream in self.discovered_catalog.streams 354 if stream.name in streams_filter 355 ], 356 )
Get the configured catalog for the given streams.
If the raw catalog is not yet known, we call discover to get it.
If no specific streams are selected, we return a catalog that syncs all available streams.
TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)
358 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 359 """Return the JSON Schema spec for the specified stream name.""" 360 catalog: AirbyteCatalog = self.discovered_catalog 361 found: list[AirbyteStream] = [ 362 stream for stream in catalog.streams if stream.name == stream_name 363 ] 364 365 if len(found) == 0: 366 raise exc.PyAirbyteInputError( 367 message="Stream name does not exist in catalog.", 368 input_value=stream_name, 369 ) 370 371 if len(found) > 1: 372 raise exc.PyAirbyteInternalError( 373 message="Duplicate streams found with the same name.", 374 context={ 375 "found_streams": found, 376 }, 377 ) 378 379 return found[0].json_schema
Return the JSON Schema spec for the specified stream name.
381 def get_records(self, stream: str) -> LazyDataset: 382 """Read a stream from the connector. 383 384 This involves the following steps: 385 * Call discover to get the catalog 386 * Generate a configured catalog that syncs the given stream in full_refresh mode 387 * Write the configured catalog and the config to a temporary file 388 * execute the connector with read --config <config_file> --catalog <catalog_file> 389 * Listen to the messages and return the first AirbyteRecordMessages that come along. 390 * Make sure the subprocess is killed when the function returns. 391 """ 392 discovered_catalog: AirbyteCatalog = self.discovered_catalog 393 configured_catalog = ConfiguredAirbyteCatalog( 394 streams=[ 395 ConfiguredAirbyteStream( 396 stream=s, 397 sync_mode=SyncMode.full_refresh, 398 destination_sync_mode=DestinationSyncMode.overwrite, 399 ) 400 for s in discovered_catalog.streams 401 if s.name == stream 402 ], 403 ) 404 if len(configured_catalog.streams) == 0: 405 raise exc.PyAirbyteInputError( 406 message="Requested stream does not exist.", 407 context={ 408 "stream": stream, 409 "available_streams": self.get_available_streams(), 410 "connector_name": self.name, 411 }, 412 ) from KeyError(stream) 413 414 configured_stream = configured_catalog.streams[0] 415 all_properties = cast( 416 list[str], list(configured_stream.stream.json_schema["properties"].keys()) 417 ) 418 419 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 420 self._log_sync_start(cache=None) 421 yield from records 422 self._log_sync_success(cache=None) 423 424 iterator: Iterator[dict[str, Any]] = _with_logging( 425 records=( # Generator comprehension yields StreamRecord objects for each record 426 StreamRecord.from_record_message( 427 record_message=record.record, 428 expected_keys=all_properties, 429 prune_extra_fields=True, 430 ) 431 for record in self._read_with_catalog(configured_catalog) 432 if record.record 433 ) 434 ) 435 return LazyDataset( 436 iterator, 437 stream_metadata=configured_stream, 438 )
Read a stream from the connector.
This involves the following steps:
- Call discover to get the catalog
- Generate a configured catalog that syncs the given stream in full_refresh mode
- Write the configured catalog and the config to a temporary file
- execute the connector with read --config
--catalog - Listen to the messages and return the first AirbyteRecordMessages that come along.
- Make sure the subprocess is killed when the function returns.
440 @property 441 def connector_version(self) -> str | None: 442 """Return the version of the connector as reported by the executor. 443 444 Returns None if the version cannot be determined. 445 """ 446 return self.executor.get_installed_version()
Return the version of the connector as reported by the executor.
Returns None if the version cannot be determined.
448 def get_documents( 449 self, 450 stream: str, 451 title_property: str | None = None, 452 content_properties: list[str] | None = None, 453 metadata_properties: list[str] | None = None, 454 *, 455 render_metadata: bool = False, 456 ) -> Iterable[Document]: 457 """Read a stream from the connector and return the records as documents. 458 459 If metadata_properties is not set, all properties that are not content will be added to 460 the metadata. 461 462 If render_metadata is True, metadata will be rendered in the document, as well as the 463 the main content. 464 """ 465 return self.get_records(stream).to_documents( 466 title_property=title_property, 467 content_properties=content_properties, 468 metadata_properties=metadata_properties, 469 render_metadata=render_metadata, 470 )
Read a stream from the connector and return the records as documents.
If metadata_properties is not set, all properties that are not content will be added to the metadata.
If render_metadata is True, metadata will be rendered in the document, as well as the the main content.
472 def check(self) -> None: 473 """Call check on the connector. 474 475 This involves the following steps: 476 * Write the config to a temporary file 477 * execute the connector with check --config <config_file> 478 * Listen to the messages and return the first AirbyteCatalog that comes along. 479 * Make sure the subprocess is killed when the function returns. 480 """ 481 with as_temp_files([self._config]) as [config_file]: 482 try: 483 for msg in self._execute(["check", "--config", config_file]): 484 if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus: 485 if msg.connectionStatus.status != Status.FAILED: 486 print(f"Connection check succeeded for `{self.name}`.") 487 log_source_check_result( 488 name=self.name, 489 state=EventState.SUCCEEDED, 490 ) 491 return 492 493 log_source_check_result( 494 name=self.name, 495 state=EventState.FAILED, 496 ) 497 raise exc.AirbyteConnectorCheckFailedError( 498 help_url=self.docs_url, 499 context={ 500 "failure_reason": msg.connectionStatus.message, 501 }, 502 ) 503 raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages) 504 except exc.AirbyteConnectorReadError as ex: 505 raise exc.AirbyteConnectorCheckFailedError( 506 message="The connector failed to check the connection.", 507 log_text=ex.log_text, 508 ) from ex
Call check on the connector.
This involves the following steps:
- Write the config to a temporary file
- execute the connector with check --config
- Listen to the messages and return the first AirbyteCatalog that comes along.
- Make sure the subprocess is killed when the function returns.
510 def install(self) -> None: 511 """Install the connector if it is not yet installed.""" 512 self.executor.install() 513 print("For configuration instructions, see: \n" f"{self.docs_url}#reference\n")
Install the connector if it is not yet installed.
515 def uninstall(self) -> None: 516 """Uninstall the connector if it is installed. 517 518 This only works if the use_local_install flag wasn't used and installation is managed by 519 PyAirbyte. 520 """ 521 self.executor.uninstall()
Uninstall the connector if it is installed.
This only works if the use_local_install flag wasn't used and installation is managed by PyAirbyte.
656 def read( 657 self, 658 cache: CacheBase | None = None, 659 *, 660 streams: str | list[str] | None = None, 661 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 662 force_full_refresh: bool = False, 663 skip_validation: bool = False, 664 ) -> ReadResult: 665 """Read from the connector and write to the cache. 666 667 Args: 668 cache: The cache to write to. If None, a default cache will be used. 669 write_strategy: The strategy to use when writing to the cache. If a string, it must be 670 one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one 671 of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or 672 WriteStrategy.AUTO. 673 streams: Optional if already set. A list of stream names to select for reading. If set 674 to "*", all streams will be selected. 675 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 676 streams will be read in incremental mode if supported by the connector. This option 677 must be True when using the "replace" strategy. 678 """ 679 if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: 680 warnings.warn( 681 message=( 682 "Using `REPLACE` strategy without also setting `full_refresh_mode=True` " 683 "could result in data loss. " 684 "To silence this warning, use the following: " 685 'warnings.filterwarnings("ignore", ' 686 'category="airbyte.warnings.PyAirbyteDataLossWarning")`' 687 ), 688 category=PyAirbyteDataLossWarning, 689 stacklevel=1, 690 ) 691 if isinstance(write_strategy, str): 692 try: 693 write_strategy = WriteStrategy(write_strategy) 694 except ValueError: 695 raise exc.PyAirbyteInputError( 696 message="Invalid strategy", 697 context={ 698 "write_strategy": write_strategy, 699 "available_strategies": [s.value for s in WriteStrategy], 700 }, 701 ) from None 702 703 if streams: 704 self.select_streams(streams) 705 706 if not self._selected_stream_names: 707 raise exc.PyAirbyteNoStreamsSelectedError( 708 connector_name=self.name, 709 available_streams=self.get_available_streams(), 710 ) 711 712 # Run optional validation step 713 if not skip_validation: 714 self.validate_config() 715 716 # Set up cache and related resources 717 if cache is None: 718 cache = get_default_cache() 719 720 # Set up state provider if not in full refresh mode 721 if force_full_refresh: 722 state_provider: StateProviderBase | None = None 723 else: 724 state_provider = cache.get_state_provider( 725 source_name=self.name, 726 ) 727 728 self._log_sync_start(cache=cache) 729 730 cache_processor = cache.get_record_processor( 731 source_name=self.name, 732 catalog_provider=CatalogProvider(self.configured_catalog), 733 ) 734 try: 735 cache_processor.process_airbyte_messages( 736 self._read_with_catalog( 737 catalog=self.configured_catalog, 738 state=state_provider, 739 ), 740 write_strategy=write_strategy, 741 ) 742 743 # TODO: We should catch more specific exceptions here 744 except Exception as ex: 745 self._log_sync_failure(cache=cache, exception=ex) 746 raise exc.AirbyteConnectorFailedError( 747 log_text=self._last_log_messages, 748 ) from ex 749 750 self._log_sync_success(cache=cache) 751 return ReadResult( 752 processed_records=self._processed_records, 753 cache=cache, 754 processed_streams=[stream.stream.name for stream in self.configured_catalog.streams], 755 )
Read from the connector and write to the cache.
Arguments:
- cache: The cache to write to. If None, a default cache will be used.
- write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
- streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
- force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.