airbyte.mcp.local_ops
Local MCP operations.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Local MCP operations.""" 3 4import sys 5import traceback 6from itertools import islice 7from pathlib import Path 8from typing import TYPE_CHECKING, Annotated, Any, Literal 9 10from fastmcp import FastMCP 11from pydantic import BaseModel, Field 12 13from airbyte import get_source 14from airbyte._util.meta import is_docker_installed 15from airbyte.caches.util import get_default_cache 16from airbyte.mcp._util import resolve_config, resolve_list_of_strings 17from airbyte.secrets.config import _get_secret_sources 18from airbyte.secrets.env_vars import DotenvSecretManager 19from airbyte.secrets.google_gsm import GoogleGSMSecretManager 20from airbyte.sources.base import Source 21from airbyte.sources.registry import get_connector_metadata 22 23 24if TYPE_CHECKING: 25 from airbyte.caches.duckdb import DuckDBCache 26 27 28_CONFIG_HELP = """ 29You can provide `config` as JSON or a Path to a YAML/JSON file. 30If a `dict` is provided, it must not contain hardcoded secrets. 31Instead, secrets should be provided using environment variables, 32and the config should reference them using the format 33`secret_reference::ENV_VAR_NAME`. 34 35You can also provide a `config_secret_name` to use a specific 36secret name for the configuration. This is useful if you want to 37validate a configuration that is stored in a secrets manager. 38 39If `config_secret_name` is provided, it should point to a string 40that contains valid JSON or YAML. 41 42If both `config` and `config_secret_name` are provided, the 43`config` will be loaded first and then the referenced secret config 44will be layered on top of the non-secret config. 45 46For declarative connectors, you can provide a `manifest_path` to 47specify a local YAML manifest file instead of using the registry 48version. This is useful for testing custom or locally-developed 49connector manifests. 50""" 51 52 53def _get_mcp_source( 54 connector_name: str, 55 override_execution_mode: Literal["auto", "docker", "python", "yaml"] = "auto", 56 *, 57 install_if_missing: bool = True, 58 manifest_path: str | Path | None, 59) -> Source: 60 """Get the MCP source for a connector.""" 61 if manifest_path: 62 override_execution_mode = "yaml" 63 elif override_execution_mode == "auto" and is_docker_installed(): 64 override_execution_mode = "docker" 65 66 source: Source 67 if override_execution_mode == "auto": 68 # Use defaults with no overrides 69 source = get_source( 70 connector_name, 71 install_if_missing=False, 72 source_manifest=manifest_path or None, 73 ) 74 elif override_execution_mode == "python": 75 source = get_source( 76 connector_name, 77 use_python=True, 78 install_if_missing=False, 79 source_manifest=manifest_path or None, 80 ) 81 elif override_execution_mode == "docker": 82 source = get_source( 83 connector_name, 84 docker_image=True, 85 install_if_missing=False, 86 source_manifest=manifest_path or None, 87 ) 88 elif override_execution_mode == "yaml": 89 source = get_source( 90 connector_name, 91 source_manifest=manifest_path or True, 92 install_if_missing=False, 93 ) 94 else: 95 raise ValueError( 96 f"Unknown execution method: {override_execution_mode}. " 97 "Expected one of: ['auto', 'docker', 'python', 'yaml']." 98 ) 99 100 # Ensure installed: 101 if install_if_missing: 102 source.executor.ensure_installation() 103 104 return source 105 106 107# @app.tool() # << deferred 108def validate_connector_config( 109 connector_name: Annotated[ 110 str, 111 Field(description="The name of the connector to validate."), 112 ], 113 config: Annotated[ 114 dict | str | None, 115 Field( 116 description="The configuration for the connector as a dict object or JSON string.", 117 default=None, 118 ), 119 ], 120 config_file: Annotated[ 121 str | Path | None, 122 Field( 123 description="Path to a YAML or JSON file containing the connector configuration.", 124 default=None, 125 ), 126 ], 127 config_secret_name: Annotated[ 128 str | None, 129 Field( 130 description="The name of the secret containing the configuration.", 131 default=None, 132 ), 133 ], 134 override_execution_mode: Annotated[ 135 Literal["docker", "python", "yaml", "auto"], 136 Field( 137 description="Optionally override the execution method to use for the connector. " 138 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 139 default="auto", 140 ), 141 ], 142 manifest_path: Annotated[ 143 str | Path | None, 144 Field( 145 description="Path to a local YAML manifest file for declarative connectors.", 146 default=None, 147 ), 148 ], 149) -> tuple[bool, str]: 150 """Validate a connector configuration. 151 152 Returns a tuple of (is_valid: bool, message: str). 153 """ 154 try: 155 source: Source = _get_mcp_source( 156 connector_name, 157 override_execution_mode=override_execution_mode, 158 manifest_path=manifest_path, 159 ) 160 except Exception as ex: 161 return False, f"Failed to get connector '{connector_name}': {ex}" 162 163 try: 164 config_dict = resolve_config( 165 config=config, 166 config_file=config_file, 167 config_secret_name=config_secret_name, 168 config_spec_jsonschema=source.config_spec, 169 ) 170 source.set_config(config_dict) 171 except Exception as ex: 172 return False, f"Failed to resolve configuration for {connector_name}: {ex}" 173 174 try: 175 source.check() 176 except Exception as ex: 177 return False, f"Configuration for {connector_name} is invalid: {ex}" 178 179 return True, f"Configuration for {connector_name} is valid!" 180 181 182# @app.tool() # << deferred 183def list_connector_config_secrets( 184 connector_name: Annotated[ 185 str, 186 Field(description="The name of the connector."), 187 ], 188) -> list[str]: 189 """List all `config_secret_name` options that are known for the given connector. 190 191 This can be used to find out which already-created config secret names are available 192 for a given connector. The return value is a list of secret names, but it will not 193 return the actual secret values. 194 """ 195 secrets_names: list[str] = [] 196 for secrets_mgr in _get_secret_sources(): 197 if isinstance(secrets_mgr, GoogleGSMSecretManager): 198 secrets_names.extend( 199 [ 200 secret_handle.secret_name.split("/")[-1] 201 for secret_handle in secrets_mgr.fetch_connector_secrets(connector_name) 202 ] 203 ) 204 205 return secrets_names 206 207 208def list_dotenv_secrets() -> dict[str, list[str]]: 209 """List all environment variable names declared within declared .env files. 210 211 This returns a dictionary mapping the .env file name to a list of environment 212 variable names. The values of the environment variables are not returned. 213 """ 214 result: dict[str, list[str]] = {} 215 for secrets_mgr in _get_secret_sources(): 216 if isinstance(secrets_mgr, DotenvSecretManager) and secrets_mgr.dotenv_path: 217 result[str(secrets_mgr.dotenv_path.resolve())] = secrets_mgr.list_secrets_names() 218 219 return result 220 221 222# @app.tool() # << deferred 223def list_source_streams( 224 source_connector_name: Annotated[ 225 str, 226 Field(description="The name of the source connector."), 227 ], 228 config: Annotated[ 229 dict | str | None, 230 Field( 231 description="The configuration for the source connector as a dict or JSON string.", 232 default=None, 233 ), 234 ], 235 config_file: Annotated[ 236 str | Path | None, 237 Field( 238 description="Path to a YAML or JSON file containing the source connector config.", 239 default=None, 240 ), 241 ], 242 config_secret_name: Annotated[ 243 str | None, 244 Field( 245 description="The name of the secret containing the configuration.", 246 default=None, 247 ), 248 ], 249 override_execution_mode: Annotated[ 250 Literal["docker", "python", "yaml", "auto"], 251 Field( 252 description="Optionally override the execution method to use for the connector. " 253 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 254 default="auto", 255 ), 256 ], 257 manifest_path: Annotated[ 258 str | Path | None, 259 Field( 260 description="Path to a local YAML manifest file for declarative connectors.", 261 default=None, 262 ), 263 ], 264) -> list[str]: 265 """List all streams available in a source connector. 266 267 This operation (generally) requires a valid configuration, including any required secrets. 268 """ 269 source: Source = _get_mcp_source( 270 connector_name=source_connector_name, 271 override_execution_mode=override_execution_mode, 272 manifest_path=manifest_path, 273 ) 274 config_dict = resolve_config( 275 config=config, 276 config_file=config_file, 277 config_secret_name=config_secret_name, 278 config_spec_jsonschema=source.config_spec, 279 ) 280 source.set_config(config_dict) 281 return source.get_available_streams() 282 283 284# @app.tool() # << deferred 285def get_source_stream_json_schema( 286 source_connector_name: Annotated[ 287 str, 288 Field(description="The name of the source connector."), 289 ], 290 stream_name: Annotated[ 291 str, 292 Field(description="The name of the stream."), 293 ], 294 config: Annotated[ 295 dict | str | None, 296 Field( 297 description="The configuration for the source connector as a dict or JSON string.", 298 default=None, 299 ), 300 ], 301 config_file: Annotated[ 302 str | Path | None, 303 Field( 304 description="Path to a YAML or JSON file containing the source connector config.", 305 default=None, 306 ), 307 ], 308 config_secret_name: Annotated[ 309 str | None, 310 Field( 311 description="The name of the secret containing the configuration.", 312 default=None, 313 ), 314 ], 315 override_execution_mode: Annotated[ 316 Literal["docker", "python", "yaml", "auto"], 317 Field( 318 description="Optionally override the execution method to use for the connector. " 319 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 320 default="auto", 321 ), 322 ], 323 manifest_path: Annotated[ 324 str | Path | None, 325 Field( 326 description="Path to a local YAML manifest file for declarative connectors.", 327 default=None, 328 ), 329 ], 330) -> dict[str, Any]: 331 """List all properties for a specific stream in a source connector.""" 332 source: Source = _get_mcp_source( 333 connector_name=source_connector_name, 334 override_execution_mode=override_execution_mode, 335 manifest_path=manifest_path, 336 ) 337 config_dict = resolve_config( 338 config=config, 339 config_file=config_file, 340 config_secret_name=config_secret_name, 341 config_spec_jsonschema=source.config_spec, 342 ) 343 source.set_config(config_dict) 344 return source.get_stream_json_schema(stream_name=stream_name) 345 346 347# @app.tool() # << deferred 348def read_source_stream_records( 349 source_connector_name: Annotated[ 350 str, 351 Field(description="The name of the source connector."), 352 ], 353 config: Annotated[ 354 dict | str | None, 355 Field( 356 description="The configuration for the source connector as a dict or JSON string.", 357 default=None, 358 ), 359 ], 360 config_file: Annotated[ 361 str | Path | None, 362 Field( 363 description="Path to a YAML or JSON file containing the source connector config.", 364 default=None, 365 ), 366 ], 367 config_secret_name: Annotated[ 368 str | None, 369 Field( 370 description="The name of the secret containing the configuration.", 371 default=None, 372 ), 373 ], 374 *, 375 stream_name: Annotated[ 376 str, 377 Field(description="The name of the stream to read records from."), 378 ], 379 max_records: Annotated[ 380 int, 381 Field( 382 description="The maximum number of records to read.", 383 default=1000, 384 ), 385 ], 386 override_execution_mode: Annotated[ 387 Literal["docker", "python", "yaml", "auto"], 388 Field( 389 description="Optionally override the execution method to use for the connector. " 390 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 391 default="auto", 392 ), 393 ], 394 manifest_path: Annotated[ 395 str | Path | None, 396 Field( 397 description="Path to a local YAML manifest file for declarative connectors.", 398 default=None, 399 ), 400 ], 401) -> list[dict[str, Any]] | str: 402 """Get records from a source connector.""" 403 try: 404 source: Source = _get_mcp_source( 405 connector_name=source_connector_name, 406 override_execution_mode=override_execution_mode, 407 manifest_path=manifest_path, 408 ) 409 config_dict = resolve_config( 410 config=config, 411 config_file=config_file, 412 config_secret_name=config_secret_name, 413 config_spec_jsonschema=source.config_spec, 414 ) 415 source.set_config(config_dict) 416 # First we get a generator for the records in the specified stream. 417 record_generator = source.get_records(stream_name) 418 # Next we load a limited number of records from the generator into our list. 419 records: list[dict[str, Any]] = list(islice(record_generator, max_records)) 420 421 print(f"Retrieved {len(records)} records from stream '{stream_name}'", sys.stderr) 422 423 except Exception as ex: 424 tb_str = traceback.format_exc() 425 # If any error occurs, we print the error message to stderr and return an empty list. 426 return ( 427 f"Error reading records from source '{source_connector_name}': {ex!r}, {ex!s}\n{tb_str}" 428 ) 429 430 else: 431 return records 432 433 434# @app.tool() # << deferred 435def get_stream_previews( 436 source_name: Annotated[ 437 str, 438 Field(description="The name of the source connector."), 439 ], 440 config: Annotated[ 441 dict | str | None, 442 Field( 443 description="The configuration for the source connector as a dict or JSON string.", 444 default=None, 445 ), 446 ], 447 config_file: Annotated[ 448 str | Path | None, 449 Field( 450 description="Path to a YAML or JSON file containing the source connector config.", 451 default=None, 452 ), 453 ], 454 config_secret_name: Annotated[ 455 str | None, 456 Field( 457 description="The name of the secret containing the configuration.", 458 default=None, 459 ), 460 ], 461 streams: Annotated[ 462 list[str] | str | None, 463 Field( 464 description=( 465 "The streams to get previews for. " 466 "Use '*' for all streams, or None for selected streams." 467 ), 468 default=None, 469 ), 470 ], 471 limit: Annotated[ 472 int, 473 Field( 474 description="The maximum number of sample records to return per stream.", 475 default=10, 476 ), 477 ], 478 override_execution_mode: Annotated[ 479 Literal["docker", "python", "yaml", "auto"], 480 Field( 481 description="Optionally override the execution method to use for the connector. " 482 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 483 default="auto", 484 ), 485 ], 486 manifest_path: Annotated[ 487 str | Path | None, 488 Field( 489 description="Path to a local YAML manifest file for declarative connectors.", 490 default=None, 491 ), 492 ], 493) -> dict[str, list[dict[str, Any]] | str]: 494 """Get sample records (previews) from streams in a source connector. 495 496 This operation requires a valid configuration, including any required secrets. 497 Returns a dictionary mapping stream names to lists of sample records, or an error 498 message string if an error occurred for that stream. 499 """ 500 source: Source = _get_mcp_source( 501 connector_name=source_name, 502 override_execution_mode=override_execution_mode, 503 manifest_path=manifest_path, 504 ) 505 506 config_dict = resolve_config( 507 config=config, 508 config_file=config_file, 509 config_secret_name=config_secret_name, 510 config_spec_jsonschema=source.config_spec, 511 ) 512 source.set_config(config_dict) 513 514 streams_param: list[str] | Literal["*"] | None = resolve_list_of_strings(streams) 515 if streams_param and len(streams_param) == 1 and streams_param[0] == "*": 516 streams_param = "*" 517 518 try: 519 samples_result = source.get_samples( 520 streams=streams_param, 521 limit=limit, 522 on_error="ignore", 523 ) 524 except Exception as ex: 525 tb_str = traceback.format_exc() 526 return { 527 "ERROR": f"Error getting stream previews from source '{source_name}': " 528 f"{ex!r}, {ex!s}\n{tb_str}" 529 } 530 531 result: dict[str, list[dict[str, Any]] | str] = {} 532 for stream_name, dataset in samples_result.items(): 533 if dataset is None: 534 result[stream_name] = f"Could not retrieve stream samples for stream '{stream_name}'" 535 else: 536 result[stream_name] = list(dataset) 537 538 return result 539 540 541# @app.tool() # << deferred 542def sync_source_to_cache( 543 source_connector_name: Annotated[ 544 str, 545 Field(description="The name of the source connector."), 546 ], 547 config: Annotated[ 548 dict | str | None, 549 Field( 550 description="The configuration for the source connector as a dict or JSON string.", 551 default=None, 552 ), 553 ], 554 config_file: Annotated[ 555 str | Path | None, 556 Field( 557 description="Path to a YAML or JSON file containing the source connector config.", 558 default=None, 559 ), 560 ], 561 config_secret_name: Annotated[ 562 str | None, 563 Field( 564 description="The name of the secret containing the configuration.", 565 default=None, 566 ), 567 ], 568 streams: Annotated[ 569 list[str] | str, 570 Field( 571 description="The streams to sync.", 572 default="suggested", 573 ), 574 ], 575 override_execution_mode: Annotated[ 576 Literal["docker", "python", "yaml", "auto"], 577 Field( 578 description="Optionally override the execution method to use for the connector. " 579 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 580 default="auto", 581 ), 582 ], 583 manifest_path: Annotated[ 584 str | Path | None, 585 Field( 586 description="Path to a local YAML manifest file for declarative connectors.", 587 default=None, 588 ), 589 ], 590) -> str: 591 """Run a sync from a source connector to the default DuckDB cache.""" 592 source: Source = _get_mcp_source( 593 connector_name=source_connector_name, 594 override_execution_mode=override_execution_mode, 595 manifest_path=manifest_path, 596 ) 597 config_dict = resolve_config( 598 config=config, 599 config_file=config_file, 600 config_secret_name=config_secret_name, 601 config_spec_jsonschema=source.config_spec, 602 ) 603 source.set_config(config_dict) 604 cache = get_default_cache() 605 606 streams = resolve_list_of_strings(streams) 607 if streams and len(streams) == 1 and streams[0] in {"*", "suggested"}: 608 # Float '*' and 'suggested' to the top-level for special processing: 609 streams = streams[0] 610 611 if isinstance(streams, str) and streams == "suggested": 612 streams = "*" # Default to all streams if 'suggested' is not otherwise specified. 613 try: 614 metadata = get_connector_metadata( 615 source_connector_name, 616 ) 617 except Exception: 618 streams = "*" # Fallback to all streams if suggested streams fail. 619 else: 620 if metadata is not None: 621 streams = metadata.suggested_streams or "*" 622 623 if isinstance(streams, str) and streams != "*": 624 streams = [streams] # Ensure streams is a list 625 626 source.read( 627 cache=cache, 628 streams=streams, 629 ) 630 del cache # Ensure the cache is closed properly 631 632 summary: str = f"Sync completed for '{source_connector_name}'!\n\n" 633 summary += "Data written to default DuckDB cache\n" 634 return summary 635 636 637class CachedDatasetInfo(BaseModel): 638 """Class to hold information about a cached dataset.""" 639 640 stream_name: str 641 """The name of the stream in the cache.""" 642 table_name: str 643 schema_name: str | None = None 644 645 646# @app.tool() # << deferred 647def list_cached_streams() -> list[CachedDatasetInfo]: 648 """List all streams available in the default DuckDB cache.""" 649 cache: DuckDBCache = get_default_cache() 650 result = [ 651 CachedDatasetInfo( 652 stream_name=stream_name, 653 table_name=(cache.table_prefix or "") + stream_name, 654 schema_name=cache.schema_name, 655 ) 656 for stream_name in cache.streams 657 ] 658 del cache # Ensure the cache is closed properly 659 return result 660 661 662# @app.tool() # << deferred 663def describe_default_cache() -> dict[str, Any]: 664 """Describe the currently configured default cache.""" 665 cache = get_default_cache() 666 return { 667 "cache_type": type(cache).__name__, 668 "cache_dir": str(cache.cache_dir), 669 "cache_db_path": str(Path(cache.db_path).absolute()), 670 "cached_streams": list(cache.streams.keys()), 671 } 672 673 674def _is_safe_sql(sql_query: str) -> bool: 675 """Check if a SQL query is safe to execute. 676 677 For security reasons, we only allow read-only operations like SELECT, DESCRIBE, and SHOW. 678 Multi-statement queries (containing semicolons) are also disallowed for security. 679 680 Note: SQLAlchemy will also validate downstream, but this is a first-pass check. 681 682 Args: 683 sql_query: The SQL query to check 684 685 Returns: 686 True if the query is safe to execute, False otherwise 687 """ 688 # Remove leading/trailing whitespace and convert to uppercase for checking 689 normalized_query = sql_query.strip().upper() 690 691 # Disallow multi-statement queries (containing semicolons) 692 # Note: We check the original query to catch semicolons anywhere, including in comments 693 if ";" in sql_query: 694 return False 695 696 # List of allowed SQL statement prefixes (read-only operations) 697 allowed_prefixes = ( 698 "SELECT", 699 "DESCRIBE", 700 "DESC", # Short form of DESCRIBE 701 "SHOW", 702 "EXPLAIN", # Also safe - shows query execution plan 703 ) 704 705 # Check if the query starts with any allowed prefix 706 return any(normalized_query.startswith(prefix) for prefix in allowed_prefixes) 707 708 709# @app.tool() # << deferred 710def run_sql_query( 711 sql_query: Annotated[ 712 str, 713 Field(description="The SQL query to execute."), 714 ], 715 max_records: Annotated[ 716 int, 717 Field( 718 description="Maximum number of records to return.", 719 default=1000, 720 ), 721 ], 722) -> list[dict[str, Any]]: 723 """Run a SQL query against the default cache. 724 725 The dialect of SQL should match the dialect of the default cache. 726 Use `describe_default_cache` to see the cache type. 727 728 For DuckDB-type caches: 729 - Use `SHOW TABLES` to list all tables. 730 - Use `DESCRIBE <table_name>` to get the schema of a specific table 731 732 For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN. 733 """ 734 # Check if the query is safe to execute 735 if not _is_safe_sql(sql_query): 736 return [ 737 { 738 "ERROR": "Unsafe SQL query detected. Only read-only operations are allowed: " 739 "SELECT, DESCRIBE, SHOW, EXPLAIN", 740 "SQL_QUERY": sql_query, 741 } 742 ] 743 744 cache: DuckDBCache = get_default_cache() 745 try: 746 return cache.run_sql_query( 747 sql_query, 748 max_records=max_records, 749 ) 750 except Exception as ex: 751 tb_str = traceback.format_exc() 752 return [ 753 { 754 "ERROR": f"Error running SQL query: {ex!r}, {ex!s}", 755 "TRACEBACK": tb_str, 756 "SQL_QUERY": sql_query, 757 } 758 ] 759 finally: 760 del cache # Ensure the cache is closed properly 761 762 763def register_local_ops_tools(app: FastMCP) -> None: 764 """@private Register tools with the FastMCP app. 765 766 This is an internal function and should not be called directly. 767 """ 768 app.tool(list_connector_config_secrets) 769 for tool in ( 770 describe_default_cache, 771 get_source_stream_json_schema, 772 get_stream_previews, 773 list_cached_streams, 774 list_dotenv_secrets, 775 list_source_streams, 776 read_source_stream_records, 777 run_sql_query, 778 sync_source_to_cache, 779 validate_connector_config, 780 ): 781 # Register each tool with the FastMCP app. 782 app.tool( 783 tool, 784 description=(tool.__doc__ or "").rstrip() + "\n" + _CONFIG_HELP, 785 )
109def validate_connector_config( 110 connector_name: Annotated[ 111 str, 112 Field(description="The name of the connector to validate."), 113 ], 114 config: Annotated[ 115 dict | str | None, 116 Field( 117 description="The configuration for the connector as a dict object or JSON string.", 118 default=None, 119 ), 120 ], 121 config_file: Annotated[ 122 str | Path | None, 123 Field( 124 description="Path to a YAML or JSON file containing the connector configuration.", 125 default=None, 126 ), 127 ], 128 config_secret_name: Annotated[ 129 str | None, 130 Field( 131 description="The name of the secret containing the configuration.", 132 default=None, 133 ), 134 ], 135 override_execution_mode: Annotated[ 136 Literal["docker", "python", "yaml", "auto"], 137 Field( 138 description="Optionally override the execution method to use for the connector. " 139 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 140 default="auto", 141 ), 142 ], 143 manifest_path: Annotated[ 144 str | Path | None, 145 Field( 146 description="Path to a local YAML manifest file for declarative connectors.", 147 default=None, 148 ), 149 ], 150) -> tuple[bool, str]: 151 """Validate a connector configuration. 152 153 Returns a tuple of (is_valid: bool, message: str). 154 """ 155 try: 156 source: Source = _get_mcp_source( 157 connector_name, 158 override_execution_mode=override_execution_mode, 159 manifest_path=manifest_path, 160 ) 161 except Exception as ex: 162 return False, f"Failed to get connector '{connector_name}': {ex}" 163 164 try: 165 config_dict = resolve_config( 166 config=config, 167 config_file=config_file, 168 config_secret_name=config_secret_name, 169 config_spec_jsonschema=source.config_spec, 170 ) 171 source.set_config(config_dict) 172 except Exception as ex: 173 return False, f"Failed to resolve configuration for {connector_name}: {ex}" 174 175 try: 176 source.check() 177 except Exception as ex: 178 return False, f"Configuration for {connector_name} is invalid: {ex}" 179 180 return True, f"Configuration for {connector_name} is valid!"
Validate a connector configuration.
Returns a tuple of (is_valid: bool, message: str).
184def list_connector_config_secrets( 185 connector_name: Annotated[ 186 str, 187 Field(description="The name of the connector."), 188 ], 189) -> list[str]: 190 """List all `config_secret_name` options that are known for the given connector. 191 192 This can be used to find out which already-created config secret names are available 193 for a given connector. The return value is a list of secret names, but it will not 194 return the actual secret values. 195 """ 196 secrets_names: list[str] = [] 197 for secrets_mgr in _get_secret_sources(): 198 if isinstance(secrets_mgr, GoogleGSMSecretManager): 199 secrets_names.extend( 200 [ 201 secret_handle.secret_name.split("/")[-1] 202 for secret_handle in secrets_mgr.fetch_connector_secrets(connector_name) 203 ] 204 ) 205 206 return secrets_names
List all config_secret_name
options that are known for the given connector.
This can be used to find out which already-created config secret names are available for a given connector. The return value is a list of secret names, but it will not return the actual secret values.
209def list_dotenv_secrets() -> dict[str, list[str]]: 210 """List all environment variable names declared within declared .env files. 211 212 This returns a dictionary mapping the .env file name to a list of environment 213 variable names. The values of the environment variables are not returned. 214 """ 215 result: dict[str, list[str]] = {} 216 for secrets_mgr in _get_secret_sources(): 217 if isinstance(secrets_mgr, DotenvSecretManager) and secrets_mgr.dotenv_path: 218 result[str(secrets_mgr.dotenv_path.resolve())] = secrets_mgr.list_secrets_names() 219 220 return result
List all environment variable names declared within declared .env files.
This returns a dictionary mapping the .env file name to a list of environment variable names. The values of the environment variables are not returned.
224def list_source_streams( 225 source_connector_name: Annotated[ 226 str, 227 Field(description="The name of the source connector."), 228 ], 229 config: Annotated[ 230 dict | str | None, 231 Field( 232 description="The configuration for the source connector as a dict or JSON string.", 233 default=None, 234 ), 235 ], 236 config_file: Annotated[ 237 str | Path | None, 238 Field( 239 description="Path to a YAML or JSON file containing the source connector config.", 240 default=None, 241 ), 242 ], 243 config_secret_name: Annotated[ 244 str | None, 245 Field( 246 description="The name of the secret containing the configuration.", 247 default=None, 248 ), 249 ], 250 override_execution_mode: Annotated[ 251 Literal["docker", "python", "yaml", "auto"], 252 Field( 253 description="Optionally override the execution method to use for the connector. " 254 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 255 default="auto", 256 ), 257 ], 258 manifest_path: Annotated[ 259 str | Path | None, 260 Field( 261 description="Path to a local YAML manifest file for declarative connectors.", 262 default=None, 263 ), 264 ], 265) -> list[str]: 266 """List all streams available in a source connector. 267 268 This operation (generally) requires a valid configuration, including any required secrets. 269 """ 270 source: Source = _get_mcp_source( 271 connector_name=source_connector_name, 272 override_execution_mode=override_execution_mode, 273 manifest_path=manifest_path, 274 ) 275 config_dict = resolve_config( 276 config=config, 277 config_file=config_file, 278 config_secret_name=config_secret_name, 279 config_spec_jsonschema=source.config_spec, 280 ) 281 source.set_config(config_dict) 282 return source.get_available_streams()
List all streams available in a source connector.
This operation (generally) requires a valid configuration, including any required secrets.
286def get_source_stream_json_schema( 287 source_connector_name: Annotated[ 288 str, 289 Field(description="The name of the source connector."), 290 ], 291 stream_name: Annotated[ 292 str, 293 Field(description="The name of the stream."), 294 ], 295 config: Annotated[ 296 dict | str | None, 297 Field( 298 description="The configuration for the source connector as a dict or JSON string.", 299 default=None, 300 ), 301 ], 302 config_file: Annotated[ 303 str | Path | None, 304 Field( 305 description="Path to a YAML or JSON file containing the source connector config.", 306 default=None, 307 ), 308 ], 309 config_secret_name: Annotated[ 310 str | None, 311 Field( 312 description="The name of the secret containing the configuration.", 313 default=None, 314 ), 315 ], 316 override_execution_mode: Annotated[ 317 Literal["docker", "python", "yaml", "auto"], 318 Field( 319 description="Optionally override the execution method to use for the connector. " 320 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 321 default="auto", 322 ), 323 ], 324 manifest_path: Annotated[ 325 str | Path | None, 326 Field( 327 description="Path to a local YAML manifest file for declarative connectors.", 328 default=None, 329 ), 330 ], 331) -> dict[str, Any]: 332 """List all properties for a specific stream in a source connector.""" 333 source: Source = _get_mcp_source( 334 connector_name=source_connector_name, 335 override_execution_mode=override_execution_mode, 336 manifest_path=manifest_path, 337 ) 338 config_dict = resolve_config( 339 config=config, 340 config_file=config_file, 341 config_secret_name=config_secret_name, 342 config_spec_jsonschema=source.config_spec, 343 ) 344 source.set_config(config_dict) 345 return source.get_stream_json_schema(stream_name=stream_name)
List all properties for a specific stream in a source connector.
349def read_source_stream_records( 350 source_connector_name: Annotated[ 351 str, 352 Field(description="The name of the source connector."), 353 ], 354 config: Annotated[ 355 dict | str | None, 356 Field( 357 description="The configuration for the source connector as a dict or JSON string.", 358 default=None, 359 ), 360 ], 361 config_file: Annotated[ 362 str | Path | None, 363 Field( 364 description="Path to a YAML or JSON file containing the source connector config.", 365 default=None, 366 ), 367 ], 368 config_secret_name: Annotated[ 369 str | None, 370 Field( 371 description="The name of the secret containing the configuration.", 372 default=None, 373 ), 374 ], 375 *, 376 stream_name: Annotated[ 377 str, 378 Field(description="The name of the stream to read records from."), 379 ], 380 max_records: Annotated[ 381 int, 382 Field( 383 description="The maximum number of records to read.", 384 default=1000, 385 ), 386 ], 387 override_execution_mode: Annotated[ 388 Literal["docker", "python", "yaml", "auto"], 389 Field( 390 description="Optionally override the execution method to use for the connector. " 391 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 392 default="auto", 393 ), 394 ], 395 manifest_path: Annotated[ 396 str | Path | None, 397 Field( 398 description="Path to a local YAML manifest file for declarative connectors.", 399 default=None, 400 ), 401 ], 402) -> list[dict[str, Any]] | str: 403 """Get records from a source connector.""" 404 try: 405 source: Source = _get_mcp_source( 406 connector_name=source_connector_name, 407 override_execution_mode=override_execution_mode, 408 manifest_path=manifest_path, 409 ) 410 config_dict = resolve_config( 411 config=config, 412 config_file=config_file, 413 config_secret_name=config_secret_name, 414 config_spec_jsonschema=source.config_spec, 415 ) 416 source.set_config(config_dict) 417 # First we get a generator for the records in the specified stream. 418 record_generator = source.get_records(stream_name) 419 # Next we load a limited number of records from the generator into our list. 420 records: list[dict[str, Any]] = list(islice(record_generator, max_records)) 421 422 print(f"Retrieved {len(records)} records from stream '{stream_name}'", sys.stderr) 423 424 except Exception as ex: 425 tb_str = traceback.format_exc() 426 # If any error occurs, we print the error message to stderr and return an empty list. 427 return ( 428 f"Error reading records from source '{source_connector_name}': {ex!r}, {ex!s}\n{tb_str}" 429 ) 430 431 else: 432 return records
Get records from a source connector.
436def get_stream_previews( 437 source_name: Annotated[ 438 str, 439 Field(description="The name of the source connector."), 440 ], 441 config: Annotated[ 442 dict | str | None, 443 Field( 444 description="The configuration for the source connector as a dict or JSON string.", 445 default=None, 446 ), 447 ], 448 config_file: Annotated[ 449 str | Path | None, 450 Field( 451 description="Path to a YAML or JSON file containing the source connector config.", 452 default=None, 453 ), 454 ], 455 config_secret_name: Annotated[ 456 str | None, 457 Field( 458 description="The name of the secret containing the configuration.", 459 default=None, 460 ), 461 ], 462 streams: Annotated[ 463 list[str] | str | None, 464 Field( 465 description=( 466 "The streams to get previews for. " 467 "Use '*' for all streams, or None for selected streams." 468 ), 469 default=None, 470 ), 471 ], 472 limit: Annotated[ 473 int, 474 Field( 475 description="The maximum number of sample records to return per stream.", 476 default=10, 477 ), 478 ], 479 override_execution_mode: Annotated[ 480 Literal["docker", "python", "yaml", "auto"], 481 Field( 482 description="Optionally override the execution method to use for the connector. " 483 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 484 default="auto", 485 ), 486 ], 487 manifest_path: Annotated[ 488 str | Path | None, 489 Field( 490 description="Path to a local YAML manifest file for declarative connectors.", 491 default=None, 492 ), 493 ], 494) -> dict[str, list[dict[str, Any]] | str]: 495 """Get sample records (previews) from streams in a source connector. 496 497 This operation requires a valid configuration, including any required secrets. 498 Returns a dictionary mapping stream names to lists of sample records, or an error 499 message string if an error occurred for that stream. 500 """ 501 source: Source = _get_mcp_source( 502 connector_name=source_name, 503 override_execution_mode=override_execution_mode, 504 manifest_path=manifest_path, 505 ) 506 507 config_dict = resolve_config( 508 config=config, 509 config_file=config_file, 510 config_secret_name=config_secret_name, 511 config_spec_jsonschema=source.config_spec, 512 ) 513 source.set_config(config_dict) 514 515 streams_param: list[str] | Literal["*"] | None = resolve_list_of_strings(streams) 516 if streams_param and len(streams_param) == 1 and streams_param[0] == "*": 517 streams_param = "*" 518 519 try: 520 samples_result = source.get_samples( 521 streams=streams_param, 522 limit=limit, 523 on_error="ignore", 524 ) 525 except Exception as ex: 526 tb_str = traceback.format_exc() 527 return { 528 "ERROR": f"Error getting stream previews from source '{source_name}': " 529 f"{ex!r}, {ex!s}\n{tb_str}" 530 } 531 532 result: dict[str, list[dict[str, Any]] | str] = {} 533 for stream_name, dataset in samples_result.items(): 534 if dataset is None: 535 result[stream_name] = f"Could not retrieve stream samples for stream '{stream_name}'" 536 else: 537 result[stream_name] = list(dataset) 538 539 return result
Get sample records (previews) from streams in a source connector.
This operation requires a valid configuration, including any required secrets. Returns a dictionary mapping stream names to lists of sample records, or an error message string if an error occurred for that stream.
543def sync_source_to_cache( 544 source_connector_name: Annotated[ 545 str, 546 Field(description="The name of the source connector."), 547 ], 548 config: Annotated[ 549 dict | str | None, 550 Field( 551 description="The configuration for the source connector as a dict or JSON string.", 552 default=None, 553 ), 554 ], 555 config_file: Annotated[ 556 str | Path | None, 557 Field( 558 description="Path to a YAML or JSON file containing the source connector config.", 559 default=None, 560 ), 561 ], 562 config_secret_name: Annotated[ 563 str | None, 564 Field( 565 description="The name of the secret containing the configuration.", 566 default=None, 567 ), 568 ], 569 streams: Annotated[ 570 list[str] | str, 571 Field( 572 description="The streams to sync.", 573 default="suggested", 574 ), 575 ], 576 override_execution_mode: Annotated[ 577 Literal["docker", "python", "yaml", "auto"], 578 Field( 579 description="Optionally override the execution method to use for the connector. " 580 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 581 default="auto", 582 ), 583 ], 584 manifest_path: Annotated[ 585 str | Path | None, 586 Field( 587 description="Path to a local YAML manifest file for declarative connectors.", 588 default=None, 589 ), 590 ], 591) -> str: 592 """Run a sync from a source connector to the default DuckDB cache.""" 593 source: Source = _get_mcp_source( 594 connector_name=source_connector_name, 595 override_execution_mode=override_execution_mode, 596 manifest_path=manifest_path, 597 ) 598 config_dict = resolve_config( 599 config=config, 600 config_file=config_file, 601 config_secret_name=config_secret_name, 602 config_spec_jsonschema=source.config_spec, 603 ) 604 source.set_config(config_dict) 605 cache = get_default_cache() 606 607 streams = resolve_list_of_strings(streams) 608 if streams and len(streams) == 1 and streams[0] in {"*", "suggested"}: 609 # Float '*' and 'suggested' to the top-level for special processing: 610 streams = streams[0] 611 612 if isinstance(streams, str) and streams == "suggested": 613 streams = "*" # Default to all streams if 'suggested' is not otherwise specified. 614 try: 615 metadata = get_connector_metadata( 616 source_connector_name, 617 ) 618 except Exception: 619 streams = "*" # Fallback to all streams if suggested streams fail. 620 else: 621 if metadata is not None: 622 streams = metadata.suggested_streams or "*" 623 624 if isinstance(streams, str) and streams != "*": 625 streams = [streams] # Ensure streams is a list 626 627 source.read( 628 cache=cache, 629 streams=streams, 630 ) 631 del cache # Ensure the cache is closed properly 632 633 summary: str = f"Sync completed for '{source_connector_name}'!\n\n" 634 summary += "Data written to default DuckDB cache\n" 635 return summary
Run a sync from a source connector to the default DuckDB cache.
638class CachedDatasetInfo(BaseModel): 639 """Class to hold information about a cached dataset.""" 640 641 stream_name: str 642 """The name of the stream in the cache.""" 643 table_name: str 644 schema_name: str | None = None
Class to hold information about a cached dataset.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
648def list_cached_streams() -> list[CachedDatasetInfo]: 649 """List all streams available in the default DuckDB cache.""" 650 cache: DuckDBCache = get_default_cache() 651 result = [ 652 CachedDatasetInfo( 653 stream_name=stream_name, 654 table_name=(cache.table_prefix or "") + stream_name, 655 schema_name=cache.schema_name, 656 ) 657 for stream_name in cache.streams 658 ] 659 del cache # Ensure the cache is closed properly 660 return result
List all streams available in the default DuckDB cache.
664def describe_default_cache() -> dict[str, Any]: 665 """Describe the currently configured default cache.""" 666 cache = get_default_cache() 667 return { 668 "cache_type": type(cache).__name__, 669 "cache_dir": str(cache.cache_dir), 670 "cache_db_path": str(Path(cache.db_path).absolute()), 671 "cached_streams": list(cache.streams.keys()), 672 }
Describe the currently configured default cache.
711def run_sql_query( 712 sql_query: Annotated[ 713 str, 714 Field(description="The SQL query to execute."), 715 ], 716 max_records: Annotated[ 717 int, 718 Field( 719 description="Maximum number of records to return.", 720 default=1000, 721 ), 722 ], 723) -> list[dict[str, Any]]: 724 """Run a SQL query against the default cache. 725 726 The dialect of SQL should match the dialect of the default cache. 727 Use `describe_default_cache` to see the cache type. 728 729 For DuckDB-type caches: 730 - Use `SHOW TABLES` to list all tables. 731 - Use `DESCRIBE <table_name>` to get the schema of a specific table 732 733 For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN. 734 """ 735 # Check if the query is safe to execute 736 if not _is_safe_sql(sql_query): 737 return [ 738 { 739 "ERROR": "Unsafe SQL query detected. Only read-only operations are allowed: " 740 "SELECT, DESCRIBE, SHOW, EXPLAIN", 741 "SQL_QUERY": sql_query, 742 } 743 ] 744 745 cache: DuckDBCache = get_default_cache() 746 try: 747 return cache.run_sql_query( 748 sql_query, 749 max_records=max_records, 750 ) 751 except Exception as ex: 752 tb_str = traceback.format_exc() 753 return [ 754 { 755 "ERROR": f"Error running SQL query: {ex!r}, {ex!s}", 756 "TRACEBACK": tb_str, 757 "SQL_QUERY": sql_query, 758 } 759 ] 760 finally: 761 del cache # Ensure the cache is closed properly
Run a SQL query against the default cache.
The dialect of SQL should match the dialect of the default cache.
Use describe_default_cache
to see the cache type.
For DuckDB-type caches:
- Use
SHOW TABLES
to list all tables. - Use
DESCRIBE <table_name>
to get the schema of a specific table
For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN.