airbyte.mcp.local
Local MCP operations.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Local MCP operations.""" 3 4import sys 5import traceback 6from itertools import islice 7from pathlib import Path 8from typing import TYPE_CHECKING, Annotated, Any, Literal 9 10from fastmcp import FastMCP 11from fastmcp_extensions import mcp_tool, register_mcp_tools 12from pydantic import BaseModel, Field 13 14from airbyte import get_source 15from airbyte._util.destination_smoke_tests import ( 16 DestinationSmokeTestResult, 17 run_destination_smoke_test, 18) 19from airbyte._util.meta import is_docker_installed 20from airbyte.caches.util import get_default_cache 21from airbyte.destinations.util import get_destination 22from airbyte.mcp._arg_resolvers import resolve_connector_config, resolve_list_of_strings 23from airbyte.registry import get_connector_metadata 24from airbyte.secrets.config import _get_secret_sources 25from airbyte.secrets.env_vars import DotenvSecretManager 26from airbyte.secrets.google_gsm import GoogleGSMSecretManager 27from airbyte.sources.base import Source 28 29 30if TYPE_CHECKING: 31 from airbyte.caches.duckdb import DuckDBCache 32 33 34_CONFIG_HELP = """ 35You can provide `config` as JSON or a Path to a YAML/JSON file. 36If a `dict` is provided, it must not contain hardcoded secrets. 37Instead, secrets should be provided using environment variables, 38and the config should reference them using the format 39`secret_reference::ENV_VAR_NAME`. 40 41You can also provide a `config_secret_name` to use a specific 42secret name for the configuration. This is useful if you want to 43validate a configuration that is stored in a secrets manager. 44 45If `config_secret_name` is provided, it should point to a string 46that contains valid JSON or YAML. 47 48If both `config` and `config_secret_name` are provided, the 49`config` will be loaded first and then the referenced secret config 50will be layered on top of the non-secret config. 51 52For declarative connectors, you can provide a `manifest_path` to 53specify a local YAML manifest file instead of using the registry 54version. This is useful for testing custom or locally-developed 55connector manifests. 56""" 57 58 59def _get_mcp_source( 60 connector_name: str, 61 override_execution_mode: Literal["auto", "docker", "python", "yaml"] = "auto", 62 *, 63 install_if_missing: bool = True, 64 manifest_path: str | Path | None, 65) -> Source: 66 """Get the MCP source for a connector.""" 67 if manifest_path: 68 override_execution_mode = "yaml" 69 elif override_execution_mode == "auto" and is_docker_installed(): 70 override_execution_mode = "docker" 71 72 source: Source 73 if override_execution_mode == "auto": 74 # Use defaults with no overrides 75 source = get_source( 76 connector_name, 77 install_if_missing=False, 78 source_manifest=manifest_path or None, 79 ) 80 elif override_execution_mode == "python": 81 source = get_source( 82 connector_name, 83 use_python=True, 84 install_if_missing=False, 85 source_manifest=manifest_path or None, 86 ) 87 elif override_execution_mode == "docker": 88 source = get_source( 89 connector_name, 90 docker_image=True, 91 install_if_missing=False, 92 source_manifest=manifest_path or None, 93 ) 94 elif override_execution_mode == "yaml": 95 source = get_source( 96 connector_name, 97 source_manifest=manifest_path or True, 98 install_if_missing=False, 99 ) 100 else: 101 raise ValueError( 102 f"Unknown execution method: {override_execution_mode}. " 103 "Expected one of: ['auto', 'docker', 'python', 'yaml']." 104 ) 105 106 # Ensure installed: 107 if install_if_missing: 108 source.executor.ensure_installation() 109 110 return source 111 112 113@mcp_tool( 114 read_only=True, 115 idempotent=True, 116 extra_help_text=_CONFIG_HELP, 117) 118def validate_connector_config( 119 connector_name: Annotated[ 120 str, 121 Field(description="The name of the connector to validate."), 122 ], 123 config: Annotated[ 124 dict | str | None, 125 Field( 126 description="The configuration for the connector as a dict object or JSON string.", 127 default=None, 128 ), 129 ], 130 config_file: Annotated[ 131 str | Path | None, 132 Field( 133 description="Path to a YAML or JSON file containing the connector configuration.", 134 default=None, 135 ), 136 ], 137 config_secret_name: Annotated[ 138 str | None, 139 Field( 140 description="The name of the secret containing the configuration.", 141 default=None, 142 ), 143 ], 144 override_execution_mode: Annotated[ 145 Literal["docker", "python", "yaml", "auto"], 146 Field( 147 description="Optionally override the execution method to use for the connector. " 148 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 149 default="auto", 150 ), 151 ], 152 manifest_path: Annotated[ 153 str | Path | None, 154 Field( 155 description="Path to a local YAML manifest file for declarative connectors.", 156 default=None, 157 ), 158 ], 159) -> tuple[bool, str]: 160 """Validate a connector configuration. 161 162 Returns a tuple of (is_valid: bool, message: str). 163 """ 164 try: 165 source: Source = _get_mcp_source( 166 connector_name, 167 override_execution_mode=override_execution_mode, 168 manifest_path=manifest_path, 169 ) 170 except Exception as ex: 171 return False, f"Failed to get connector '{connector_name}': {ex}" 172 173 try: 174 config_dict = resolve_connector_config( 175 config=config, 176 config_file=config_file, 177 config_secret_name=config_secret_name, 178 config_spec_jsonschema=source.config_spec, 179 ) 180 source.set_config(config_dict) 181 except Exception as ex: 182 return False, f"Failed to resolve configuration for {connector_name}: {ex}" 183 184 try: 185 source.check() 186 except Exception as ex: 187 return False, f"Configuration for {connector_name} is invalid: {ex}" 188 189 return True, f"Configuration for {connector_name} is valid!" 190 191 192@mcp_tool( 193 read_only=True, 194 idempotent=True, 195) 196def list_connector_config_secrets( 197 connector_name: Annotated[ 198 str, 199 Field(description="The name of the connector."), 200 ], 201) -> list[str]: 202 """List all `config_secret_name` options that are known for the given connector. 203 204 This can be used to find out which already-created config secret names are available 205 for a given connector. The return value is a list of secret names, but it will not 206 return the actual secret values. 207 """ 208 secrets_names: list[str] = [] 209 for secrets_mgr in _get_secret_sources(): 210 if isinstance(secrets_mgr, GoogleGSMSecretManager): 211 secrets_names.extend( 212 [ 213 secret_handle.secret_name.split("/")[-1] 214 for secret_handle in secrets_mgr.fetch_connector_secrets(connector_name) 215 ] 216 ) 217 218 return secrets_names 219 220 221@mcp_tool( 222 read_only=True, 223 idempotent=True, 224 extra_help_text=_CONFIG_HELP, 225) 226def list_dotenv_secrets() -> dict[str, list[str]]: 227 """List all environment variable names declared within declared .env files. 228 229 This returns a dictionary mapping the .env file name to a list of environment 230 variable names. The values of the environment variables are not returned. 231 """ 232 result: dict[str, list[str]] = {} 233 for secrets_mgr in _get_secret_sources(): 234 if isinstance(secrets_mgr, DotenvSecretManager) and secrets_mgr.dotenv_path: 235 result[str(secrets_mgr.dotenv_path.resolve())] = secrets_mgr.list_secrets_names() 236 237 return result 238 239 240@mcp_tool( 241 read_only=True, 242 idempotent=True, 243 extra_help_text=_CONFIG_HELP, 244) 245def list_source_streams( 246 source_connector_name: Annotated[ 247 str, 248 Field(description="The name of the source connector."), 249 ], 250 config: Annotated[ 251 dict | str | None, 252 Field( 253 description="The configuration for the source connector as a dict or JSON string.", 254 default=None, 255 ), 256 ], 257 config_file: Annotated[ 258 str | Path | None, 259 Field( 260 description="Path to a YAML or JSON file containing the source connector config.", 261 default=None, 262 ), 263 ], 264 config_secret_name: Annotated[ 265 str | None, 266 Field( 267 description="The name of the secret containing the configuration.", 268 default=None, 269 ), 270 ], 271 override_execution_mode: Annotated[ 272 Literal["docker", "python", "yaml", "auto"], 273 Field( 274 description="Optionally override the execution method to use for the connector. " 275 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 276 default="auto", 277 ), 278 ], 279 manifest_path: Annotated[ 280 str | Path | None, 281 Field( 282 description="Path to a local YAML manifest file for declarative connectors.", 283 default=None, 284 ), 285 ], 286) -> list[str]: 287 """List all streams available in a source connector. 288 289 This operation (generally) requires a valid configuration, including any required secrets. 290 """ 291 source: Source = _get_mcp_source( 292 connector_name=source_connector_name, 293 override_execution_mode=override_execution_mode, 294 manifest_path=manifest_path, 295 ) 296 config_dict = resolve_connector_config( 297 config=config, 298 config_file=config_file, 299 config_secret_name=config_secret_name, 300 config_spec_jsonschema=source.config_spec, 301 ) 302 source.set_config(config_dict) 303 return source.get_available_streams() 304 305 306@mcp_tool( 307 read_only=True, 308 idempotent=True, 309 extra_help_text=_CONFIG_HELP, 310) 311def get_source_stream_json_schema( 312 source_connector_name: Annotated[ 313 str, 314 Field(description="The name of the source connector."), 315 ], 316 stream_name: Annotated[ 317 str, 318 Field(description="The name of the stream."), 319 ], 320 config: Annotated[ 321 dict | str | None, 322 Field( 323 description="The configuration for the source connector as a dict or JSON string.", 324 default=None, 325 ), 326 ], 327 config_file: Annotated[ 328 str | Path | None, 329 Field( 330 description="Path to a YAML or JSON file containing the source connector config.", 331 default=None, 332 ), 333 ], 334 config_secret_name: Annotated[ 335 str | None, 336 Field( 337 description="The name of the secret containing the configuration.", 338 default=None, 339 ), 340 ], 341 override_execution_mode: Annotated[ 342 Literal["docker", "python", "yaml", "auto"], 343 Field( 344 description="Optionally override the execution method to use for the connector. " 345 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 346 default="auto", 347 ), 348 ], 349 manifest_path: Annotated[ 350 str | Path | None, 351 Field( 352 description="Path to a local YAML manifest file for declarative connectors.", 353 default=None, 354 ), 355 ], 356) -> dict[str, Any]: 357 """List all properties for a specific stream in a source connector.""" 358 source: Source = _get_mcp_source( 359 connector_name=source_connector_name, 360 override_execution_mode=override_execution_mode, 361 manifest_path=manifest_path, 362 ) 363 config_dict = resolve_connector_config( 364 config=config, 365 config_file=config_file, 366 config_secret_name=config_secret_name, 367 config_spec_jsonschema=source.config_spec, 368 ) 369 source.set_config(config_dict) 370 return source.get_stream_json_schema(stream_name=stream_name) 371 372 373@mcp_tool( 374 read_only=True, 375 extra_help_text=_CONFIG_HELP, 376) 377def read_source_stream_records( 378 source_connector_name: Annotated[ 379 str, 380 Field(description="The name of the source connector."), 381 ], 382 config: Annotated[ 383 dict | str | None, 384 Field( 385 description="The configuration for the source connector as a dict or JSON string.", 386 default=None, 387 ), 388 ], 389 config_file: Annotated[ 390 str | Path | None, 391 Field( 392 description="Path to a YAML or JSON file containing the source connector config.", 393 default=None, 394 ), 395 ], 396 config_secret_name: Annotated[ 397 str | None, 398 Field( 399 description="The name of the secret containing the configuration.", 400 default=None, 401 ), 402 ], 403 *, 404 stream_name: Annotated[ 405 str, 406 Field(description="The name of the stream to read records from."), 407 ], 408 max_records: Annotated[ 409 int, 410 Field( 411 description="The maximum number of records to read.", 412 default=1000, 413 ), 414 ], 415 override_execution_mode: Annotated[ 416 Literal["docker", "python", "yaml", "auto"], 417 Field( 418 description="Optionally override the execution method to use for the connector. " 419 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 420 default="auto", 421 ), 422 ], 423 manifest_path: Annotated[ 424 str | Path | None, 425 Field( 426 description="Path to a local YAML manifest file for declarative connectors.", 427 default=None, 428 ), 429 ], 430) -> list[dict[str, Any]] | str: 431 """Get records from a source connector.""" 432 try: 433 source: Source = _get_mcp_source( 434 connector_name=source_connector_name, 435 override_execution_mode=override_execution_mode, 436 manifest_path=manifest_path, 437 ) 438 config_dict = resolve_connector_config( 439 config=config, 440 config_file=config_file, 441 config_secret_name=config_secret_name, 442 config_spec_jsonschema=source.config_spec, 443 ) 444 source.set_config(config_dict) 445 # First we get a generator for the records in the specified stream. 446 record_generator = source.get_records(stream_name) 447 # Next we load a limited number of records from the generator into our list. 448 records: list[dict[str, Any]] = list(islice(record_generator, max_records)) 449 450 print(f"Retrieved {len(records)} records from stream '{stream_name}'", sys.stderr) 451 452 except Exception as ex: 453 tb_str = traceback.format_exc() 454 # If any error occurs, we print the error message to stderr and return an empty list. 455 return ( 456 f"Error reading records from source '{source_connector_name}': {ex!r}, {ex!s}\n{tb_str}" 457 ) 458 459 else: 460 return records 461 462 463@mcp_tool( 464 read_only=True, 465 extra_help_text=_CONFIG_HELP, 466) 467def get_stream_previews( 468 source_name: Annotated[ 469 str, 470 Field(description="The name of the source connector."), 471 ], 472 config: Annotated[ 473 dict | str | None, 474 Field( 475 description="The configuration for the source connector as a dict or JSON string.", 476 default=None, 477 ), 478 ], 479 config_file: Annotated[ 480 str | Path | None, 481 Field( 482 description="Path to a YAML or JSON file containing the source connector config.", 483 default=None, 484 ), 485 ], 486 config_secret_name: Annotated[ 487 str | None, 488 Field( 489 description="The name of the secret containing the configuration.", 490 default=None, 491 ), 492 ], 493 streams: Annotated[ 494 list[str] | str | None, 495 Field( 496 description=( 497 "The streams to get previews for. " 498 "Use '*' for all streams, or None for selected streams." 499 ), 500 default=None, 501 ), 502 ], 503 limit: Annotated[ 504 int, 505 Field( 506 description="The maximum number of sample records to return per stream.", 507 default=10, 508 ), 509 ], 510 override_execution_mode: Annotated[ 511 Literal["docker", "python", "yaml", "auto"], 512 Field( 513 description="Optionally override the execution method to use for the connector. " 514 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 515 default="auto", 516 ), 517 ], 518 manifest_path: Annotated[ 519 str | Path | None, 520 Field( 521 description="Path to a local YAML manifest file for declarative connectors.", 522 default=None, 523 ), 524 ], 525) -> dict[str, list[dict[str, Any]] | str]: 526 """Get sample records (previews) from streams in a source connector. 527 528 This operation requires a valid configuration, including any required secrets. 529 Returns a dictionary mapping stream names to lists of sample records, or an error 530 message string if an error occurred for that stream. 531 """ 532 source: Source = _get_mcp_source( 533 connector_name=source_name, 534 override_execution_mode=override_execution_mode, 535 manifest_path=manifest_path, 536 ) 537 538 config_dict = resolve_connector_config( 539 config=config, 540 config_file=config_file, 541 config_secret_name=config_secret_name, 542 config_spec_jsonschema=source.config_spec, 543 ) 544 source.set_config(config_dict) 545 546 streams_param: list[str] | Literal["*"] | None = resolve_list_of_strings( 547 streams 548 ) # pyrefly: ignore[no-matching-overload] 549 if streams_param and len(streams_param) == 1 and streams_param[0] == "*": 550 streams_param = "*" 551 552 try: 553 samples_result = source.get_samples( 554 streams=streams_param, 555 limit=limit, 556 on_error="ignore", 557 ) 558 except Exception as ex: 559 tb_str = traceback.format_exc() 560 return { 561 "ERROR": f"Error getting stream previews from source '{source_name}': " 562 f"{ex!r}, {ex!s}\n{tb_str}" 563 } 564 565 result: dict[str, list[dict[str, Any]] | str] = {} 566 for stream_name, dataset in samples_result.items(): 567 if dataset is None: 568 result[stream_name] = f"Could not retrieve stream samples for stream '{stream_name}'" 569 else: 570 result[stream_name] = list(dataset) 571 572 return result 573 574 575@mcp_tool( 576 destructive=False, 577 extra_help_text=_CONFIG_HELP, 578) 579def sync_source_to_cache( 580 source_connector_name: Annotated[ 581 str, 582 Field(description="The name of the source connector."), 583 ], 584 config: Annotated[ 585 dict | str | None, 586 Field( 587 description="The configuration for the source connector as a dict or JSON string.", 588 default=None, 589 ), 590 ], 591 config_file: Annotated[ 592 str | Path | None, 593 Field( 594 description="Path to a YAML or JSON file containing the source connector config.", 595 default=None, 596 ), 597 ], 598 config_secret_name: Annotated[ 599 str | None, 600 Field( 601 description="The name of the secret containing the configuration.", 602 default=None, 603 ), 604 ], 605 streams: Annotated[ 606 list[str] | str, 607 Field( 608 description="The streams to sync.", 609 default="suggested", 610 ), 611 ], 612 override_execution_mode: Annotated[ 613 Literal["docker", "python", "yaml", "auto"], 614 Field( 615 description="Optionally override the execution method to use for the connector. " 616 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 617 default="auto", 618 ), 619 ], 620 manifest_path: Annotated[ 621 str | Path | None, 622 Field( 623 description="Path to a local YAML manifest file for declarative connectors.", 624 default=None, 625 ), 626 ], 627) -> str: 628 """Run a sync from a source connector to the default DuckDB cache.""" 629 source: Source = _get_mcp_source( 630 connector_name=source_connector_name, 631 override_execution_mode=override_execution_mode, 632 manifest_path=manifest_path, 633 ) 634 config_dict = resolve_connector_config( 635 config=config, 636 config_file=config_file, 637 config_secret_name=config_secret_name, 638 config_spec_jsonschema=source.config_spec, 639 ) 640 source.set_config(config_dict) 641 cache = get_default_cache() 642 643 streams = resolve_list_of_strings(streams) 644 if streams and len(streams) == 1 and streams[0] in {"*", "suggested"}: 645 # Float '*' and 'suggested' to the top-level for special processing: 646 streams = streams[0] 647 648 if isinstance(streams, str) and streams == "suggested": 649 streams = "*" # Default to all streams if 'suggested' is not otherwise specified. 650 try: 651 metadata = get_connector_metadata( 652 source_connector_name, 653 ) 654 except Exception: 655 streams = "*" # Fallback to all streams if suggested streams fail. 656 else: 657 if metadata is not None: 658 streams = metadata.suggested_streams or "*" 659 660 if isinstance(streams, str) and streams != "*": 661 streams = [streams] # Ensure streams is a list 662 663 source.read( 664 cache=cache, 665 streams=streams, 666 ) 667 del cache # Ensure the cache is closed properly 668 669 summary: str = f"Sync completed for '{source_connector_name}'!\n\n" 670 summary += "Data written to default DuckDB cache\n" 671 return summary 672 673 674class CachedDatasetInfo(BaseModel): 675 """Class to hold information about a cached dataset.""" 676 677 stream_name: str 678 """The name of the stream in the cache.""" 679 table_name: str 680 schema_name: str | None = None 681 682 683@mcp_tool( 684 read_only=True, 685 idempotent=True, 686 extra_help_text=_CONFIG_HELP, 687) 688def list_cached_streams() -> list[CachedDatasetInfo]: 689 """List all streams available in the default DuckDB cache.""" 690 cache: DuckDBCache = get_default_cache() 691 result = [ 692 CachedDatasetInfo( 693 stream_name=stream_name, 694 table_name=(cache.table_prefix or "") + stream_name, 695 schema_name=cache.schema_name, 696 ) 697 for stream_name in cache.streams 698 ] 699 del cache # Ensure the cache is closed properly 700 return result 701 702 703@mcp_tool( 704 read_only=True, 705 idempotent=True, 706 extra_help_text=_CONFIG_HELP, 707) 708def describe_default_cache() -> dict[str, Any]: 709 """Describe the currently configured default cache.""" 710 cache = get_default_cache() 711 return { 712 "cache_type": type(cache).__name__, 713 "cache_dir": str(cache.cache_dir), 714 "cache_db_path": str(Path(cache.db_path).absolute()), 715 "cached_streams": list(cache.streams.keys()), 716 } 717 718 719def _is_safe_sql(sql_query: str) -> bool: 720 """Check if a SQL query is safe to execute. 721 722 For security reasons, we only allow read-only operations like SELECT, DESCRIBE, and SHOW. 723 Multi-statement queries (containing semicolons) are also disallowed for security. 724 725 Note: SQLAlchemy will also validate downstream, but this is a first-pass check. 726 727 Args: 728 sql_query: The SQL query to check 729 730 Returns: 731 True if the query is safe to execute, False otherwise 732 """ 733 # Remove leading/trailing whitespace and convert to uppercase for checking 734 normalized_query = sql_query.strip().upper() 735 736 # Disallow multi-statement queries (containing semicolons) 737 # Note: We check the original query to catch semicolons anywhere, including in comments 738 if ";" in sql_query: 739 return False 740 741 # List of allowed SQL statement prefixes (read-only operations) 742 allowed_prefixes = ( 743 "SELECT", 744 "DESCRIBE", 745 "DESC", # Short form of DESCRIBE 746 "SHOW", 747 "EXPLAIN", # Also safe - shows query execution plan 748 ) 749 750 # Check if the query starts with any allowed prefix 751 return any(normalized_query.startswith(prefix) for prefix in allowed_prefixes) 752 753 754@mcp_tool( 755 read_only=True, 756 idempotent=True, 757 extra_help_text=_CONFIG_HELP, 758) 759def run_sql_query( 760 sql_query: Annotated[ 761 str, 762 Field(description="The SQL query to execute."), 763 ], 764 max_records: Annotated[ 765 int, 766 Field( 767 description="Maximum number of records to return.", 768 default=1000, 769 ), 770 ], 771) -> list[dict[str, Any]]: 772 """Run a SQL query against the default cache. 773 774 The dialect of SQL should match the dialect of the default cache. 775 Use `describe_default_cache` to see the cache type. 776 777 For DuckDB-type caches: 778 - Use `SHOW TABLES` to list all tables. 779 - Use `DESCRIBE <table_name>` to get the schema of a specific table 780 781 For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN. 782 """ 783 # Check if the query is safe to execute 784 if not _is_safe_sql(sql_query): 785 return [ 786 { 787 "ERROR": "Unsafe SQL query detected. Only read-only operations are allowed: " 788 "SELECT, DESCRIBE, SHOW, EXPLAIN", 789 "SQL_QUERY": sql_query, 790 } 791 ] 792 793 cache: DuckDBCache = get_default_cache() 794 try: 795 return cache.run_sql_query( 796 sql_query, 797 max_records=max_records, 798 ) 799 except Exception as ex: 800 tb_str = traceback.format_exc() 801 return [ 802 { 803 "ERROR": f"Error running SQL query: {ex!r}, {ex!s}", 804 "TRACEBACK": tb_str, 805 "SQL_QUERY": sql_query, 806 } 807 ] 808 finally: 809 del cache # Ensure the cache is closed properly 810 811 812@mcp_tool( 813 destructive=True, 814) 815def destination_smoke_test( # noqa: PLR0913, PLR0917 816 destination_connector_name: Annotated[ 817 str, 818 Field( 819 description=( 820 "The name of the destination connector to test " 821 "(e.g. 'destination-snowflake', 'destination-motherduck')." 822 ), 823 ), 824 ], 825 config: Annotated[ 826 dict | str | None, 827 Field( 828 description=( 829 "The destination configuration as a dict object or JSON string. " 830 "Must not contain hardcoded secrets; use secret_reference::ENV_VAR_NAME instead." 831 ), 832 default=None, 833 ), 834 ], 835 config_file: Annotated[ 836 str | Path | None, 837 Field( 838 description="Path to a YAML or JSON file containing the destination configuration.", 839 default=None, 840 ), 841 ], 842 config_secret_name: Annotated[ 843 str | None, 844 Field( 845 description="The name of the secret containing the destination configuration.", 846 default=None, 847 ), 848 ], 849 scenarios: Annotated[ 850 list[str] | str, 851 Field( 852 description=( 853 "Which scenarios to run. Use 'fast' (default) for all fast predefined " 854 "scenarios (excludes large_batch_stream), 'all' for every predefined " 855 "scenario including large batch, or provide a list of scenario names " 856 "or a comma-separated string." 857 ), 858 default="fast", 859 ), 860 ], 861 custom_scenarios: Annotated[ 862 list[dict[str, Any]] | None, 863 Field( 864 description=( 865 "Additional custom test scenarios to inject. Each scenario should define " 866 "'name', 'json_schema', and optionally 'records' and 'primary_key'. " 867 "These are unioned with the predefined scenarios." 868 ), 869 default=None, 870 ), 871 ], 872 docker_image: Annotated[ 873 str | None, 874 Field( 875 description=( 876 "Optional Docker image override for the destination connector " 877 "(e.g. 'airbyte/destination-snowflake:3.14.0')." 878 ), 879 default=None, 880 ), 881 ], 882 namespace_suffix: Annotated[ 883 str | None, 884 Field( 885 description=( 886 "Optional suffix appended to the auto-generated namespace. " 887 "Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). " 888 "Use this to distinguish concurrent runs." 889 ), 890 default=None, 891 ), 892 ], 893 reuse_namespace: Annotated[ 894 str | None, 895 Field( 896 description=( 897 "Exact namespace to reuse from a previous run. " 898 "When set, no new namespace is generated. " 899 "Useful for running a second test against an already-populated namespace." 900 ), 901 default=None, 902 ), 903 ], 904) -> DestinationSmokeTestResult: 905 """Run smoke tests against a destination connector. 906 907 Sends synthetic test data from the smoke test source to the specified 908 destination and reports success or failure. The smoke test source generates 909 data across predefined scenarios covering common destination failure patterns: 910 type variations, null handling, naming edge cases, schema variations, and 911 batch sizes. 912 913 This tool does NOT read back data from the destination or compare results. 914 It only verifies that the destination accepts the data without errors. 915 """ 916 # Resolve destination config 917 config_dict = resolve_connector_config( 918 config=config, 919 config_file=config_file, 920 config_secret_name=config_secret_name, 921 ) 922 923 # Set up destination 924 destination_kwargs: dict[str, Any] = { 925 "name": destination_connector_name, 926 "config": config_dict, 927 } 928 if docker_image: 929 destination_kwargs["docker_image"] = docker_image 930 elif is_docker_installed(): 931 destination_kwargs["docker_image"] = True 932 933 destination_obj = get_destination(**destination_kwargs) 934 935 # Resolve scenarios for the shared helper 936 resolved_scenarios: str | list[str] 937 if isinstance(scenarios, str): 938 resolved_scenarios = scenarios 939 else: 940 resolved_scenarios = resolve_list_of_strings(scenarios) or "fast" 941 942 return run_destination_smoke_test( 943 destination=destination_obj, 944 scenarios=resolved_scenarios, 945 namespace_suffix=namespace_suffix, 946 reuse_namespace=reuse_namespace, 947 custom_scenarios=custom_scenarios, 948 ) 949 950 951def register_local_tools(app: FastMCP) -> None: 952 """Register local tools with the FastMCP app. 953 954 Args: 955 app: FastMCP application instance 956 """ 957 register_mcp_tools(app, mcp_module=__name__)
114@mcp_tool( 115 read_only=True, 116 idempotent=True, 117 extra_help_text=_CONFIG_HELP, 118) 119def validate_connector_config( 120 connector_name: Annotated[ 121 str, 122 Field(description="The name of the connector to validate."), 123 ], 124 config: Annotated[ 125 dict | str | None, 126 Field( 127 description="The configuration for the connector as a dict object or JSON string.", 128 default=None, 129 ), 130 ], 131 config_file: Annotated[ 132 str | Path | None, 133 Field( 134 description="Path to a YAML or JSON file containing the connector configuration.", 135 default=None, 136 ), 137 ], 138 config_secret_name: Annotated[ 139 str | None, 140 Field( 141 description="The name of the secret containing the configuration.", 142 default=None, 143 ), 144 ], 145 override_execution_mode: Annotated[ 146 Literal["docker", "python", "yaml", "auto"], 147 Field( 148 description="Optionally override the execution method to use for the connector. " 149 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 150 default="auto", 151 ), 152 ], 153 manifest_path: Annotated[ 154 str | Path | None, 155 Field( 156 description="Path to a local YAML manifest file for declarative connectors.", 157 default=None, 158 ), 159 ], 160) -> tuple[bool, str]: 161 """Validate a connector configuration. 162 163 Returns a tuple of (is_valid: bool, message: str). 164 """ 165 try: 166 source: Source = _get_mcp_source( 167 connector_name, 168 override_execution_mode=override_execution_mode, 169 manifest_path=manifest_path, 170 ) 171 except Exception as ex: 172 return False, f"Failed to get connector '{connector_name}': {ex}" 173 174 try: 175 config_dict = resolve_connector_config( 176 config=config, 177 config_file=config_file, 178 config_secret_name=config_secret_name, 179 config_spec_jsonschema=source.config_spec, 180 ) 181 source.set_config(config_dict) 182 except Exception as ex: 183 return False, f"Failed to resolve configuration for {connector_name}: {ex}" 184 185 try: 186 source.check() 187 except Exception as ex: 188 return False, f"Configuration for {connector_name} is invalid: {ex}" 189 190 return True, f"Configuration for {connector_name} is valid!"
Validate a connector configuration.
Returns a tuple of (is_valid: bool, message: str).
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
193@mcp_tool( 194 read_only=True, 195 idempotent=True, 196) 197def list_connector_config_secrets( 198 connector_name: Annotated[ 199 str, 200 Field(description="The name of the connector."), 201 ], 202) -> list[str]: 203 """List all `config_secret_name` options that are known for the given connector. 204 205 This can be used to find out which already-created config secret names are available 206 for a given connector. The return value is a list of secret names, but it will not 207 return the actual secret values. 208 """ 209 secrets_names: list[str] = [] 210 for secrets_mgr in _get_secret_sources(): 211 if isinstance(secrets_mgr, GoogleGSMSecretManager): 212 secrets_names.extend( 213 [ 214 secret_handle.secret_name.split("/")[-1] 215 for secret_handle in secrets_mgr.fetch_connector_secrets(connector_name) 216 ] 217 ) 218 219 return secrets_names
List all config_secret_name options that are known for the given connector.
This can be used to find out which already-created config secret names are available for a given connector. The return value is a list of secret names, but it will not return the actual secret values.
222@mcp_tool( 223 read_only=True, 224 idempotent=True, 225 extra_help_text=_CONFIG_HELP, 226) 227def list_dotenv_secrets() -> dict[str, list[str]]: 228 """List all environment variable names declared within declared .env files. 229 230 This returns a dictionary mapping the .env file name to a list of environment 231 variable names. The values of the environment variables are not returned. 232 """ 233 result: dict[str, list[str]] = {} 234 for secrets_mgr in _get_secret_sources(): 235 if isinstance(secrets_mgr, DotenvSecretManager) and secrets_mgr.dotenv_path: 236 result[str(secrets_mgr.dotenv_path.resolve())] = secrets_mgr.list_secrets_names() 237 238 return result
List all environment variable names declared within declared .env files.
This returns a dictionary mapping the .env file name to a list of environment
variable names. The values of the environment variables are not returned.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
241@mcp_tool( 242 read_only=True, 243 idempotent=True, 244 extra_help_text=_CONFIG_HELP, 245) 246def list_source_streams( 247 source_connector_name: Annotated[ 248 str, 249 Field(description="The name of the source connector."), 250 ], 251 config: Annotated[ 252 dict | str | None, 253 Field( 254 description="The configuration for the source connector as a dict or JSON string.", 255 default=None, 256 ), 257 ], 258 config_file: Annotated[ 259 str | Path | None, 260 Field( 261 description="Path to a YAML or JSON file containing the source connector config.", 262 default=None, 263 ), 264 ], 265 config_secret_name: Annotated[ 266 str | None, 267 Field( 268 description="The name of the secret containing the configuration.", 269 default=None, 270 ), 271 ], 272 override_execution_mode: Annotated[ 273 Literal["docker", "python", "yaml", "auto"], 274 Field( 275 description="Optionally override the execution method to use for the connector. " 276 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 277 default="auto", 278 ), 279 ], 280 manifest_path: Annotated[ 281 str | Path | None, 282 Field( 283 description="Path to a local YAML manifest file for declarative connectors.", 284 default=None, 285 ), 286 ], 287) -> list[str]: 288 """List all streams available in a source connector. 289 290 This operation (generally) requires a valid configuration, including any required secrets. 291 """ 292 source: Source = _get_mcp_source( 293 connector_name=source_connector_name, 294 override_execution_mode=override_execution_mode, 295 manifest_path=manifest_path, 296 ) 297 config_dict = resolve_connector_config( 298 config=config, 299 config_file=config_file, 300 config_secret_name=config_secret_name, 301 config_spec_jsonschema=source.config_spec, 302 ) 303 source.set_config(config_dict) 304 return source.get_available_streams()
List all streams available in a source connector.
This operation (generally) requires a valid configuration, including any required secrets.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
307@mcp_tool( 308 read_only=True, 309 idempotent=True, 310 extra_help_text=_CONFIG_HELP, 311) 312def get_source_stream_json_schema( 313 source_connector_name: Annotated[ 314 str, 315 Field(description="The name of the source connector."), 316 ], 317 stream_name: Annotated[ 318 str, 319 Field(description="The name of the stream."), 320 ], 321 config: Annotated[ 322 dict | str | None, 323 Field( 324 description="The configuration for the source connector as a dict or JSON string.", 325 default=None, 326 ), 327 ], 328 config_file: Annotated[ 329 str | Path | None, 330 Field( 331 description="Path to a YAML or JSON file containing the source connector config.", 332 default=None, 333 ), 334 ], 335 config_secret_name: Annotated[ 336 str | None, 337 Field( 338 description="The name of the secret containing the configuration.", 339 default=None, 340 ), 341 ], 342 override_execution_mode: Annotated[ 343 Literal["docker", "python", "yaml", "auto"], 344 Field( 345 description="Optionally override the execution method to use for the connector. " 346 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 347 default="auto", 348 ), 349 ], 350 manifest_path: Annotated[ 351 str | Path | None, 352 Field( 353 description="Path to a local YAML manifest file for declarative connectors.", 354 default=None, 355 ), 356 ], 357) -> dict[str, Any]: 358 """List all properties for a specific stream in a source connector.""" 359 source: Source = _get_mcp_source( 360 connector_name=source_connector_name, 361 override_execution_mode=override_execution_mode, 362 manifest_path=manifest_path, 363 ) 364 config_dict = resolve_connector_config( 365 config=config, 366 config_file=config_file, 367 config_secret_name=config_secret_name, 368 config_spec_jsonschema=source.config_spec, 369 ) 370 source.set_config(config_dict) 371 return source.get_stream_json_schema(stream_name=stream_name)
List all properties for a specific stream in a source connector.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
374@mcp_tool( 375 read_only=True, 376 extra_help_text=_CONFIG_HELP, 377) 378def read_source_stream_records( 379 source_connector_name: Annotated[ 380 str, 381 Field(description="The name of the source connector."), 382 ], 383 config: Annotated[ 384 dict | str | None, 385 Field( 386 description="The configuration for the source connector as a dict or JSON string.", 387 default=None, 388 ), 389 ], 390 config_file: Annotated[ 391 str | Path | None, 392 Field( 393 description="Path to a YAML or JSON file containing the source connector config.", 394 default=None, 395 ), 396 ], 397 config_secret_name: Annotated[ 398 str | None, 399 Field( 400 description="The name of the secret containing the configuration.", 401 default=None, 402 ), 403 ], 404 *, 405 stream_name: Annotated[ 406 str, 407 Field(description="The name of the stream to read records from."), 408 ], 409 max_records: Annotated[ 410 int, 411 Field( 412 description="The maximum number of records to read.", 413 default=1000, 414 ), 415 ], 416 override_execution_mode: Annotated[ 417 Literal["docker", "python", "yaml", "auto"], 418 Field( 419 description="Optionally override the execution method to use for the connector. " 420 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 421 default="auto", 422 ), 423 ], 424 manifest_path: Annotated[ 425 str | Path | None, 426 Field( 427 description="Path to a local YAML manifest file for declarative connectors.", 428 default=None, 429 ), 430 ], 431) -> list[dict[str, Any]] | str: 432 """Get records from a source connector.""" 433 try: 434 source: Source = _get_mcp_source( 435 connector_name=source_connector_name, 436 override_execution_mode=override_execution_mode, 437 manifest_path=manifest_path, 438 ) 439 config_dict = resolve_connector_config( 440 config=config, 441 config_file=config_file, 442 config_secret_name=config_secret_name, 443 config_spec_jsonschema=source.config_spec, 444 ) 445 source.set_config(config_dict) 446 # First we get a generator for the records in the specified stream. 447 record_generator = source.get_records(stream_name) 448 # Next we load a limited number of records from the generator into our list. 449 records: list[dict[str, Any]] = list(islice(record_generator, max_records)) 450 451 print(f"Retrieved {len(records)} records from stream '{stream_name}'", sys.stderr) 452 453 except Exception as ex: 454 tb_str = traceback.format_exc() 455 # If any error occurs, we print the error message to stderr and return an empty list. 456 return ( 457 f"Error reading records from source '{source_connector_name}': {ex!r}, {ex!s}\n{tb_str}" 458 ) 459 460 else: 461 return records
Get records from a source connector.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
464@mcp_tool( 465 read_only=True, 466 extra_help_text=_CONFIG_HELP, 467) 468def get_stream_previews( 469 source_name: Annotated[ 470 str, 471 Field(description="The name of the source connector."), 472 ], 473 config: Annotated[ 474 dict | str | None, 475 Field( 476 description="The configuration for the source connector as a dict or JSON string.", 477 default=None, 478 ), 479 ], 480 config_file: Annotated[ 481 str | Path | None, 482 Field( 483 description="Path to a YAML or JSON file containing the source connector config.", 484 default=None, 485 ), 486 ], 487 config_secret_name: Annotated[ 488 str | None, 489 Field( 490 description="The name of the secret containing the configuration.", 491 default=None, 492 ), 493 ], 494 streams: Annotated[ 495 list[str] | str | None, 496 Field( 497 description=( 498 "The streams to get previews for. " 499 "Use '*' for all streams, or None for selected streams." 500 ), 501 default=None, 502 ), 503 ], 504 limit: Annotated[ 505 int, 506 Field( 507 description="The maximum number of sample records to return per stream.", 508 default=10, 509 ), 510 ], 511 override_execution_mode: Annotated[ 512 Literal["docker", "python", "yaml", "auto"], 513 Field( 514 description="Optionally override the execution method to use for the connector. " 515 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 516 default="auto", 517 ), 518 ], 519 manifest_path: Annotated[ 520 str | Path | None, 521 Field( 522 description="Path to a local YAML manifest file for declarative connectors.", 523 default=None, 524 ), 525 ], 526) -> dict[str, list[dict[str, Any]] | str]: 527 """Get sample records (previews) from streams in a source connector. 528 529 This operation requires a valid configuration, including any required secrets. 530 Returns a dictionary mapping stream names to lists of sample records, or an error 531 message string if an error occurred for that stream. 532 """ 533 source: Source = _get_mcp_source( 534 connector_name=source_name, 535 override_execution_mode=override_execution_mode, 536 manifest_path=manifest_path, 537 ) 538 539 config_dict = resolve_connector_config( 540 config=config, 541 config_file=config_file, 542 config_secret_name=config_secret_name, 543 config_spec_jsonschema=source.config_spec, 544 ) 545 source.set_config(config_dict) 546 547 streams_param: list[str] | Literal["*"] | None = resolve_list_of_strings( 548 streams 549 ) # pyrefly: ignore[no-matching-overload] 550 if streams_param and len(streams_param) == 1 and streams_param[0] == "*": 551 streams_param = "*" 552 553 try: 554 samples_result = source.get_samples( 555 streams=streams_param, 556 limit=limit, 557 on_error="ignore", 558 ) 559 except Exception as ex: 560 tb_str = traceback.format_exc() 561 return { 562 "ERROR": f"Error getting stream previews from source '{source_name}': " 563 f"{ex!r}, {ex!s}\n{tb_str}" 564 } 565 566 result: dict[str, list[dict[str, Any]] | str] = {} 567 for stream_name, dataset in samples_result.items(): 568 if dataset is None: 569 result[stream_name] = f"Could not retrieve stream samples for stream '{stream_name}'" 570 else: 571 result[stream_name] = list(dataset) 572 573 return result
Get sample records (previews) from streams in a source connector.
This operation requires a valid configuration, including any required secrets.
Returns a dictionary mapping stream names to lists of sample records, or an error
message string if an error occurred for that stream.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
576@mcp_tool( 577 destructive=False, 578 extra_help_text=_CONFIG_HELP, 579) 580def sync_source_to_cache( 581 source_connector_name: Annotated[ 582 str, 583 Field(description="The name of the source connector."), 584 ], 585 config: Annotated[ 586 dict | str | None, 587 Field( 588 description="The configuration for the source connector as a dict or JSON string.", 589 default=None, 590 ), 591 ], 592 config_file: Annotated[ 593 str | Path | None, 594 Field( 595 description="Path to a YAML or JSON file containing the source connector config.", 596 default=None, 597 ), 598 ], 599 config_secret_name: Annotated[ 600 str | None, 601 Field( 602 description="The name of the secret containing the configuration.", 603 default=None, 604 ), 605 ], 606 streams: Annotated[ 607 list[str] | str, 608 Field( 609 description="The streams to sync.", 610 default="suggested", 611 ), 612 ], 613 override_execution_mode: Annotated[ 614 Literal["docker", "python", "yaml", "auto"], 615 Field( 616 description="Optionally override the execution method to use for the connector. " 617 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 618 default="auto", 619 ), 620 ], 621 manifest_path: Annotated[ 622 str | Path | None, 623 Field( 624 description="Path to a local YAML manifest file for declarative connectors.", 625 default=None, 626 ), 627 ], 628) -> str: 629 """Run a sync from a source connector to the default DuckDB cache.""" 630 source: Source = _get_mcp_source( 631 connector_name=source_connector_name, 632 override_execution_mode=override_execution_mode, 633 manifest_path=manifest_path, 634 ) 635 config_dict = resolve_connector_config( 636 config=config, 637 config_file=config_file, 638 config_secret_name=config_secret_name, 639 config_spec_jsonschema=source.config_spec, 640 ) 641 source.set_config(config_dict) 642 cache = get_default_cache() 643 644 streams = resolve_list_of_strings(streams) 645 if streams and len(streams) == 1 and streams[0] in {"*", "suggested"}: 646 # Float '*' and 'suggested' to the top-level for special processing: 647 streams = streams[0] 648 649 if isinstance(streams, str) and streams == "suggested": 650 streams = "*" # Default to all streams if 'suggested' is not otherwise specified. 651 try: 652 metadata = get_connector_metadata( 653 source_connector_name, 654 ) 655 except Exception: 656 streams = "*" # Fallback to all streams if suggested streams fail. 657 else: 658 if metadata is not None: 659 streams = metadata.suggested_streams or "*" 660 661 if isinstance(streams, str) and streams != "*": 662 streams = [streams] # Ensure streams is a list 663 664 source.read( 665 cache=cache, 666 streams=streams, 667 ) 668 del cache # Ensure the cache is closed properly 669 670 summary: str = f"Sync completed for '{source_connector_name}'!\n\n" 671 summary += "Data written to default DuckDB cache\n" 672 return summary
Run a sync from a source connector to the default DuckDB cache.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
675class CachedDatasetInfo(BaseModel): 676 """Class to hold information about a cached dataset.""" 677 678 stream_name: str 679 """The name of the stream in the cache.""" 680 table_name: str 681 schema_name: str | None = None
Class to hold information about a cached dataset.
684@mcp_tool( 685 read_only=True, 686 idempotent=True, 687 extra_help_text=_CONFIG_HELP, 688) 689def list_cached_streams() -> list[CachedDatasetInfo]: 690 """List all streams available in the default DuckDB cache.""" 691 cache: DuckDBCache = get_default_cache() 692 result = [ 693 CachedDatasetInfo( 694 stream_name=stream_name, 695 table_name=(cache.table_prefix or "") + stream_name, 696 schema_name=cache.schema_name, 697 ) 698 for stream_name in cache.streams 699 ] 700 del cache # Ensure the cache is closed properly 701 return result
List all streams available in the default DuckDB cache.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
704@mcp_tool( 705 read_only=True, 706 idempotent=True, 707 extra_help_text=_CONFIG_HELP, 708) 709def describe_default_cache() -> dict[str, Any]: 710 """Describe the currently configured default cache.""" 711 cache = get_default_cache() 712 return { 713 "cache_type": type(cache).__name__, 714 "cache_dir": str(cache.cache_dir), 715 "cache_db_path": str(Path(cache.db_path).absolute()), 716 "cached_streams": list(cache.streams.keys()), 717 }
Describe the currently configured default cache.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
755@mcp_tool( 756 read_only=True, 757 idempotent=True, 758 extra_help_text=_CONFIG_HELP, 759) 760def run_sql_query( 761 sql_query: Annotated[ 762 str, 763 Field(description="The SQL query to execute."), 764 ], 765 max_records: Annotated[ 766 int, 767 Field( 768 description="Maximum number of records to return.", 769 default=1000, 770 ), 771 ], 772) -> list[dict[str, Any]]: 773 """Run a SQL query against the default cache. 774 775 The dialect of SQL should match the dialect of the default cache. 776 Use `describe_default_cache` to see the cache type. 777 778 For DuckDB-type caches: 779 - Use `SHOW TABLES` to list all tables. 780 - Use `DESCRIBE <table_name>` to get the schema of a specific table 781 782 For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN. 783 """ 784 # Check if the query is safe to execute 785 if not _is_safe_sql(sql_query): 786 return [ 787 { 788 "ERROR": "Unsafe SQL query detected. Only read-only operations are allowed: " 789 "SELECT, DESCRIBE, SHOW, EXPLAIN", 790 "SQL_QUERY": sql_query, 791 } 792 ] 793 794 cache: DuckDBCache = get_default_cache() 795 try: 796 return cache.run_sql_query( 797 sql_query, 798 max_records=max_records, 799 ) 800 except Exception as ex: 801 tb_str = traceback.format_exc() 802 return [ 803 { 804 "ERROR": f"Error running SQL query: {ex!r}, {ex!s}", 805 "TRACEBACK": tb_str, 806 "SQL_QUERY": sql_query, 807 } 808 ] 809 finally: 810 del cache # Ensure the cache is closed properly
Run a SQL query against the default cache.
The dialect of SQL should match the dialect of the default cache.
Use `describe_default_cache` to see the cache type.
For DuckDB-type caches:
- Use `SHOW TABLES` to list all tables.
- Use `DESCRIBE <table_name>` to get the schema of a specific table
For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
813@mcp_tool( 814 destructive=True, 815) 816def destination_smoke_test( # noqa: PLR0913, PLR0917 817 destination_connector_name: Annotated[ 818 str, 819 Field( 820 description=( 821 "The name of the destination connector to test " 822 "(e.g. 'destination-snowflake', 'destination-motherduck')." 823 ), 824 ), 825 ], 826 config: Annotated[ 827 dict | str | None, 828 Field( 829 description=( 830 "The destination configuration as a dict object or JSON string. " 831 "Must not contain hardcoded secrets; use secret_reference::ENV_VAR_NAME instead." 832 ), 833 default=None, 834 ), 835 ], 836 config_file: Annotated[ 837 str | Path | None, 838 Field( 839 description="Path to a YAML or JSON file containing the destination configuration.", 840 default=None, 841 ), 842 ], 843 config_secret_name: Annotated[ 844 str | None, 845 Field( 846 description="The name of the secret containing the destination configuration.", 847 default=None, 848 ), 849 ], 850 scenarios: Annotated[ 851 list[str] | str, 852 Field( 853 description=( 854 "Which scenarios to run. Use 'fast' (default) for all fast predefined " 855 "scenarios (excludes large_batch_stream), 'all' for every predefined " 856 "scenario including large batch, or provide a list of scenario names " 857 "or a comma-separated string." 858 ), 859 default="fast", 860 ), 861 ], 862 custom_scenarios: Annotated[ 863 list[dict[str, Any]] | None, 864 Field( 865 description=( 866 "Additional custom test scenarios to inject. Each scenario should define " 867 "'name', 'json_schema', and optionally 'records' and 'primary_key'. " 868 "These are unioned with the predefined scenarios." 869 ), 870 default=None, 871 ), 872 ], 873 docker_image: Annotated[ 874 str | None, 875 Field( 876 description=( 877 "Optional Docker image override for the destination connector " 878 "(e.g. 'airbyte/destination-snowflake:3.14.0')." 879 ), 880 default=None, 881 ), 882 ], 883 namespace_suffix: Annotated[ 884 str | None, 885 Field( 886 description=( 887 "Optional suffix appended to the auto-generated namespace. " 888 "Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). " 889 "Use this to distinguish concurrent runs." 890 ), 891 default=None, 892 ), 893 ], 894 reuse_namespace: Annotated[ 895 str | None, 896 Field( 897 description=( 898 "Exact namespace to reuse from a previous run. " 899 "When set, no new namespace is generated. " 900 "Useful for running a second test against an already-populated namespace." 901 ), 902 default=None, 903 ), 904 ], 905) -> DestinationSmokeTestResult: 906 """Run smoke tests against a destination connector. 907 908 Sends synthetic test data from the smoke test source to the specified 909 destination and reports success or failure. The smoke test source generates 910 data across predefined scenarios covering common destination failure patterns: 911 type variations, null handling, naming edge cases, schema variations, and 912 batch sizes. 913 914 This tool does NOT read back data from the destination or compare results. 915 It only verifies that the destination accepts the data without errors. 916 """ 917 # Resolve destination config 918 config_dict = resolve_connector_config( 919 config=config, 920 config_file=config_file, 921 config_secret_name=config_secret_name, 922 ) 923 924 # Set up destination 925 destination_kwargs: dict[str, Any] = { 926 "name": destination_connector_name, 927 "config": config_dict, 928 } 929 if docker_image: 930 destination_kwargs["docker_image"] = docker_image 931 elif is_docker_installed(): 932 destination_kwargs["docker_image"] = True 933 934 destination_obj = get_destination(**destination_kwargs) 935 936 # Resolve scenarios for the shared helper 937 resolved_scenarios: str | list[str] 938 if isinstance(scenarios, str): 939 resolved_scenarios = scenarios 940 else: 941 resolved_scenarios = resolve_list_of_strings(scenarios) or "fast" 942 943 return run_destination_smoke_test( 944 destination=destination_obj, 945 scenarios=resolved_scenarios, 946 namespace_suffix=namespace_suffix, 947 reuse_namespace=reuse_namespace, 948 custom_scenarios=custom_scenarios, 949 )
Run smoke tests against a destination connector.
Sends synthetic test data from the smoke test source to the specified destination and reports success or failure. The smoke test source generates data across predefined scenarios covering common destination failure patterns: type variations, null handling, naming edge cases, schema variations, and batch sizes.
This tool does NOT read back data from the destination or compare results. It only verifies that the destination accepts the data without errors.
952def register_local_tools(app: FastMCP) -> None: 953 """Register local tools with the FastMCP app. 954 955 Args: 956 app: FastMCP application instance 957 """ 958 register_mcp_tools(app, mcp_module=__name__)
Register local tools with the FastMCP app.
Arguments:
- app: FastMCP application instance