airbyte.mcp.local
Local MCP operations.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Local MCP operations.""" 3 4import sys 5import traceback 6from itertools import islice 7from pathlib import Path 8from typing import TYPE_CHECKING, Annotated, Any, Literal 9 10from fastmcp import FastMCP 11from fastmcp_extensions import mcp_tool, register_mcp_tools 12from pydantic import BaseModel, Field 13 14from airbyte import get_source 15from airbyte._util.meta import is_docker_installed 16from airbyte.caches.util import get_default_cache 17from airbyte.mcp._arg_resolvers import resolve_connector_config, resolve_list_of_strings 18from airbyte.registry import get_connector_metadata 19from airbyte.secrets.config import _get_secret_sources 20from airbyte.secrets.env_vars import DotenvSecretManager 21from airbyte.secrets.google_gsm import GoogleGSMSecretManager 22from airbyte.sources.base import Source 23 24 25if TYPE_CHECKING: 26 from airbyte.caches.duckdb import DuckDBCache 27 28 29_CONFIG_HELP = """ 30You can provide `config` as JSON or a Path to a YAML/JSON file. 31If a `dict` is provided, it must not contain hardcoded secrets. 32Instead, secrets should be provided using environment variables, 33and the config should reference them using the format 34`secret_reference::ENV_VAR_NAME`. 35 36You can also provide a `config_secret_name` to use a specific 37secret name for the configuration. This is useful if you want to 38validate a configuration that is stored in a secrets manager. 39 40If `config_secret_name` is provided, it should point to a string 41that contains valid JSON or YAML. 42 43If both `config` and `config_secret_name` are provided, the 44`config` will be loaded first and then the referenced secret config 45will be layered on top of the non-secret config. 46 47For declarative connectors, you can provide a `manifest_path` to 48specify a local YAML manifest file instead of using the registry 49version. This is useful for testing custom or locally-developed 50connector manifests. 51""" 52 53 54def _get_mcp_source( 55 connector_name: str, 56 override_execution_mode: Literal["auto", "docker", "python", "yaml"] = "auto", 57 *, 58 install_if_missing: bool = True, 59 manifest_path: str | Path | None, 60) -> Source: 61 """Get the MCP source for a connector.""" 62 if manifest_path: 63 override_execution_mode = "yaml" 64 elif override_execution_mode == "auto" and is_docker_installed(): 65 override_execution_mode = "docker" 66 67 source: Source 68 if override_execution_mode == "auto": 69 # Use defaults with no overrides 70 source = get_source( 71 connector_name, 72 install_if_missing=False, 73 source_manifest=manifest_path or None, 74 ) 75 elif override_execution_mode == "python": 76 source = get_source( 77 connector_name, 78 use_python=True, 79 install_if_missing=False, 80 source_manifest=manifest_path or None, 81 ) 82 elif override_execution_mode == "docker": 83 source = get_source( 84 connector_name, 85 docker_image=True, 86 install_if_missing=False, 87 source_manifest=manifest_path or None, 88 ) 89 elif override_execution_mode == "yaml": 90 source = get_source( 91 connector_name, 92 source_manifest=manifest_path or True, 93 install_if_missing=False, 94 ) 95 else: 96 raise ValueError( 97 f"Unknown execution method: {override_execution_mode}. " 98 "Expected one of: ['auto', 'docker', 'python', 'yaml']." 99 ) 100 101 # Ensure installed: 102 if install_if_missing: 103 source.executor.ensure_installation() 104 105 return source 106 107 108@mcp_tool( 109 read_only=True, 110 idempotent=True, 111 extra_help_text=_CONFIG_HELP, 112) 113def validate_connector_config( 114 connector_name: Annotated[ 115 str, 116 Field(description="The name of the connector to validate."), 117 ], 118 config: Annotated[ 119 dict | str | None, 120 Field( 121 description="The configuration for the connector as a dict object or JSON string.", 122 default=None, 123 ), 124 ], 125 config_file: Annotated[ 126 str | Path | None, 127 Field( 128 description="Path to a YAML or JSON file containing the connector configuration.", 129 default=None, 130 ), 131 ], 132 config_secret_name: Annotated[ 133 str | None, 134 Field( 135 description="The name of the secret containing the configuration.", 136 default=None, 137 ), 138 ], 139 override_execution_mode: Annotated[ 140 Literal["docker", "python", "yaml", "auto"], 141 Field( 142 description="Optionally override the execution method to use for the connector. " 143 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 144 default="auto", 145 ), 146 ], 147 manifest_path: Annotated[ 148 str | Path | None, 149 Field( 150 description="Path to a local YAML manifest file for declarative connectors.", 151 default=None, 152 ), 153 ], 154) -> tuple[bool, str]: 155 """Validate a connector configuration. 156 157 Returns a tuple of (is_valid: bool, message: str). 158 """ 159 try: 160 source: Source = _get_mcp_source( 161 connector_name, 162 override_execution_mode=override_execution_mode, 163 manifest_path=manifest_path, 164 ) 165 except Exception as ex: 166 return False, f"Failed to get connector '{connector_name}': {ex}" 167 168 try: 169 config_dict = resolve_connector_config( 170 config=config, 171 config_file=config_file, 172 config_secret_name=config_secret_name, 173 config_spec_jsonschema=source.config_spec, 174 ) 175 source.set_config(config_dict) 176 except Exception as ex: 177 return False, f"Failed to resolve configuration for {connector_name}: {ex}" 178 179 try: 180 source.check() 181 except Exception as ex: 182 return False, f"Configuration for {connector_name} is invalid: {ex}" 183 184 return True, f"Configuration for {connector_name} is valid!" 185 186 187@mcp_tool( 188 read_only=True, 189 idempotent=True, 190) 191def list_connector_config_secrets( 192 connector_name: Annotated[ 193 str, 194 Field(description="The name of the connector."), 195 ], 196) -> list[str]: 197 """List all `config_secret_name` options that are known for the given connector. 198 199 This can be used to find out which already-created config secret names are available 200 for a given connector. The return value is a list of secret names, but it will not 201 return the actual secret values. 202 """ 203 secrets_names: list[str] = [] 204 for secrets_mgr in _get_secret_sources(): 205 if isinstance(secrets_mgr, GoogleGSMSecretManager): 206 secrets_names.extend( 207 [ 208 secret_handle.secret_name.split("/")[-1] 209 for secret_handle in secrets_mgr.fetch_connector_secrets(connector_name) 210 ] 211 ) 212 213 return secrets_names 214 215 216@mcp_tool( 217 read_only=True, 218 idempotent=True, 219 extra_help_text=_CONFIG_HELP, 220) 221def list_dotenv_secrets() -> dict[str, list[str]]: 222 """List all environment variable names declared within declared .env files. 223 224 This returns a dictionary mapping the .env file name to a list of environment 225 variable names. The values of the environment variables are not returned. 226 """ 227 result: dict[str, list[str]] = {} 228 for secrets_mgr in _get_secret_sources(): 229 if isinstance(secrets_mgr, DotenvSecretManager) and secrets_mgr.dotenv_path: 230 result[str(secrets_mgr.dotenv_path.resolve())] = secrets_mgr.list_secrets_names() 231 232 return result 233 234 235@mcp_tool( 236 read_only=True, 237 idempotent=True, 238 extra_help_text=_CONFIG_HELP, 239) 240def list_source_streams( 241 source_connector_name: Annotated[ 242 str, 243 Field(description="The name of the source connector."), 244 ], 245 config: Annotated[ 246 dict | str | None, 247 Field( 248 description="The configuration for the source connector as a dict or JSON string.", 249 default=None, 250 ), 251 ], 252 config_file: Annotated[ 253 str | Path | None, 254 Field( 255 description="Path to a YAML or JSON file containing the source connector config.", 256 default=None, 257 ), 258 ], 259 config_secret_name: Annotated[ 260 str | None, 261 Field( 262 description="The name of the secret containing the configuration.", 263 default=None, 264 ), 265 ], 266 override_execution_mode: Annotated[ 267 Literal["docker", "python", "yaml", "auto"], 268 Field( 269 description="Optionally override the execution method to use for the connector. " 270 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 271 default="auto", 272 ), 273 ], 274 manifest_path: Annotated[ 275 str | Path | None, 276 Field( 277 description="Path to a local YAML manifest file for declarative connectors.", 278 default=None, 279 ), 280 ], 281) -> list[str]: 282 """List all streams available in a source connector. 283 284 This operation (generally) requires a valid configuration, including any required secrets. 285 """ 286 source: Source = _get_mcp_source( 287 connector_name=source_connector_name, 288 override_execution_mode=override_execution_mode, 289 manifest_path=manifest_path, 290 ) 291 config_dict = resolve_connector_config( 292 config=config, 293 config_file=config_file, 294 config_secret_name=config_secret_name, 295 config_spec_jsonschema=source.config_spec, 296 ) 297 source.set_config(config_dict) 298 return source.get_available_streams() 299 300 301@mcp_tool( 302 read_only=True, 303 idempotent=True, 304 extra_help_text=_CONFIG_HELP, 305) 306def get_source_stream_json_schema( 307 source_connector_name: Annotated[ 308 str, 309 Field(description="The name of the source connector."), 310 ], 311 stream_name: Annotated[ 312 str, 313 Field(description="The name of the stream."), 314 ], 315 config: Annotated[ 316 dict | str | None, 317 Field( 318 description="The configuration for the source connector as a dict or JSON string.", 319 default=None, 320 ), 321 ], 322 config_file: Annotated[ 323 str | Path | None, 324 Field( 325 description="Path to a YAML or JSON file containing the source connector config.", 326 default=None, 327 ), 328 ], 329 config_secret_name: Annotated[ 330 str | None, 331 Field( 332 description="The name of the secret containing the configuration.", 333 default=None, 334 ), 335 ], 336 override_execution_mode: Annotated[ 337 Literal["docker", "python", "yaml", "auto"], 338 Field( 339 description="Optionally override the execution method to use for the connector. " 340 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 341 default="auto", 342 ), 343 ], 344 manifest_path: Annotated[ 345 str | Path | None, 346 Field( 347 description="Path to a local YAML manifest file for declarative connectors.", 348 default=None, 349 ), 350 ], 351) -> dict[str, Any]: 352 """List all properties for a specific stream in a source connector.""" 353 source: Source = _get_mcp_source( 354 connector_name=source_connector_name, 355 override_execution_mode=override_execution_mode, 356 manifest_path=manifest_path, 357 ) 358 config_dict = resolve_connector_config( 359 config=config, 360 config_file=config_file, 361 config_secret_name=config_secret_name, 362 config_spec_jsonschema=source.config_spec, 363 ) 364 source.set_config(config_dict) 365 return source.get_stream_json_schema(stream_name=stream_name) 366 367 368@mcp_tool( 369 read_only=True, 370 extra_help_text=_CONFIG_HELP, 371) 372def read_source_stream_records( 373 source_connector_name: Annotated[ 374 str, 375 Field(description="The name of the source connector."), 376 ], 377 config: Annotated[ 378 dict | str | None, 379 Field( 380 description="The configuration for the source connector as a dict or JSON string.", 381 default=None, 382 ), 383 ], 384 config_file: Annotated[ 385 str | Path | None, 386 Field( 387 description="Path to a YAML or JSON file containing the source connector config.", 388 default=None, 389 ), 390 ], 391 config_secret_name: Annotated[ 392 str | None, 393 Field( 394 description="The name of the secret containing the configuration.", 395 default=None, 396 ), 397 ], 398 *, 399 stream_name: Annotated[ 400 str, 401 Field(description="The name of the stream to read records from."), 402 ], 403 max_records: Annotated[ 404 int, 405 Field( 406 description="The maximum number of records to read.", 407 default=1000, 408 ), 409 ], 410 override_execution_mode: Annotated[ 411 Literal["docker", "python", "yaml", "auto"], 412 Field( 413 description="Optionally override the execution method to use for the connector. " 414 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 415 default="auto", 416 ), 417 ], 418 manifest_path: Annotated[ 419 str | Path | None, 420 Field( 421 description="Path to a local YAML manifest file for declarative connectors.", 422 default=None, 423 ), 424 ], 425) -> list[dict[str, Any]] | str: 426 """Get records from a source connector.""" 427 try: 428 source: Source = _get_mcp_source( 429 connector_name=source_connector_name, 430 override_execution_mode=override_execution_mode, 431 manifest_path=manifest_path, 432 ) 433 config_dict = resolve_connector_config( 434 config=config, 435 config_file=config_file, 436 config_secret_name=config_secret_name, 437 config_spec_jsonschema=source.config_spec, 438 ) 439 source.set_config(config_dict) 440 # First we get a generator for the records in the specified stream. 441 record_generator = source.get_records(stream_name) 442 # Next we load a limited number of records from the generator into our list. 443 records: list[dict[str, Any]] = list(islice(record_generator, max_records)) 444 445 print(f"Retrieved {len(records)} records from stream '{stream_name}'", sys.stderr) 446 447 except Exception as ex: 448 tb_str = traceback.format_exc() 449 # If any error occurs, we print the error message to stderr and return an empty list. 450 return ( 451 f"Error reading records from source '{source_connector_name}': {ex!r}, {ex!s}\n{tb_str}" 452 ) 453 454 else: 455 return records 456 457 458@mcp_tool( 459 read_only=True, 460 extra_help_text=_CONFIG_HELP, 461) 462def get_stream_previews( 463 source_name: Annotated[ 464 str, 465 Field(description="The name of the source connector."), 466 ], 467 config: Annotated[ 468 dict | str | None, 469 Field( 470 description="The configuration for the source connector as a dict or JSON string.", 471 default=None, 472 ), 473 ], 474 config_file: Annotated[ 475 str | Path | None, 476 Field( 477 description="Path to a YAML or JSON file containing the source connector config.", 478 default=None, 479 ), 480 ], 481 config_secret_name: Annotated[ 482 str | None, 483 Field( 484 description="The name of the secret containing the configuration.", 485 default=None, 486 ), 487 ], 488 streams: Annotated[ 489 list[str] | str | None, 490 Field( 491 description=( 492 "The streams to get previews for. " 493 "Use '*' for all streams, or None for selected streams." 494 ), 495 default=None, 496 ), 497 ], 498 limit: Annotated[ 499 int, 500 Field( 501 description="The maximum number of sample records to return per stream.", 502 default=10, 503 ), 504 ], 505 override_execution_mode: Annotated[ 506 Literal["docker", "python", "yaml", "auto"], 507 Field( 508 description="Optionally override the execution method to use for the connector. " 509 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 510 default="auto", 511 ), 512 ], 513 manifest_path: Annotated[ 514 str | Path | None, 515 Field( 516 description="Path to a local YAML manifest file for declarative connectors.", 517 default=None, 518 ), 519 ], 520) -> dict[str, list[dict[str, Any]] | str]: 521 """Get sample records (previews) from streams in a source connector. 522 523 This operation requires a valid configuration, including any required secrets. 524 Returns a dictionary mapping stream names to lists of sample records, or an error 525 message string if an error occurred for that stream. 526 """ 527 source: Source = _get_mcp_source( 528 connector_name=source_name, 529 override_execution_mode=override_execution_mode, 530 manifest_path=manifest_path, 531 ) 532 533 config_dict = resolve_connector_config( 534 config=config, 535 config_file=config_file, 536 config_secret_name=config_secret_name, 537 config_spec_jsonschema=source.config_spec, 538 ) 539 source.set_config(config_dict) 540 541 streams_param: list[str] | Literal["*"] | None = resolve_list_of_strings( 542 streams 543 ) # pyrefly: ignore[no-matching-overload] 544 if streams_param and len(streams_param) == 1 and streams_param[0] == "*": 545 streams_param = "*" 546 547 try: 548 samples_result = source.get_samples( 549 streams=streams_param, 550 limit=limit, 551 on_error="ignore", 552 ) 553 except Exception as ex: 554 tb_str = traceback.format_exc() 555 return { 556 "ERROR": f"Error getting stream previews from source '{source_name}': " 557 f"{ex!r}, {ex!s}\n{tb_str}" 558 } 559 560 result: dict[str, list[dict[str, Any]] | str] = {} 561 for stream_name, dataset in samples_result.items(): 562 if dataset is None: 563 result[stream_name] = f"Could not retrieve stream samples for stream '{stream_name}'" 564 else: 565 result[stream_name] = list(dataset) 566 567 return result 568 569 570@mcp_tool( 571 destructive=False, 572 extra_help_text=_CONFIG_HELP, 573) 574def sync_source_to_cache( 575 source_connector_name: Annotated[ 576 str, 577 Field(description="The name of the source connector."), 578 ], 579 config: Annotated[ 580 dict | str | None, 581 Field( 582 description="The configuration for the source connector as a dict or JSON string.", 583 default=None, 584 ), 585 ], 586 config_file: Annotated[ 587 str | Path | None, 588 Field( 589 description="Path to a YAML or JSON file containing the source connector config.", 590 default=None, 591 ), 592 ], 593 config_secret_name: Annotated[ 594 str | None, 595 Field( 596 description="The name of the secret containing the configuration.", 597 default=None, 598 ), 599 ], 600 streams: Annotated[ 601 list[str] | str, 602 Field( 603 description="The streams to sync.", 604 default="suggested", 605 ), 606 ], 607 override_execution_mode: Annotated[ 608 Literal["docker", "python", "yaml", "auto"], 609 Field( 610 description="Optionally override the execution method to use for the connector. " 611 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 612 default="auto", 613 ), 614 ], 615 manifest_path: Annotated[ 616 str | Path | None, 617 Field( 618 description="Path to a local YAML manifest file for declarative connectors.", 619 default=None, 620 ), 621 ], 622) -> str: 623 """Run a sync from a source connector to the default DuckDB cache.""" 624 source: Source = _get_mcp_source( 625 connector_name=source_connector_name, 626 override_execution_mode=override_execution_mode, 627 manifest_path=manifest_path, 628 ) 629 config_dict = resolve_connector_config( 630 config=config, 631 config_file=config_file, 632 config_secret_name=config_secret_name, 633 config_spec_jsonschema=source.config_spec, 634 ) 635 source.set_config(config_dict) 636 cache = get_default_cache() 637 638 streams = resolve_list_of_strings(streams) 639 if streams and len(streams) == 1 and streams[0] in {"*", "suggested"}: 640 # Float '*' and 'suggested' to the top-level for special processing: 641 streams = streams[0] 642 643 if isinstance(streams, str) and streams == "suggested": 644 streams = "*" # Default to all streams if 'suggested' is not otherwise specified. 645 try: 646 metadata = get_connector_metadata( 647 source_connector_name, 648 ) 649 except Exception: 650 streams = "*" # Fallback to all streams if suggested streams fail. 651 else: 652 if metadata is not None: 653 streams = metadata.suggested_streams or "*" 654 655 if isinstance(streams, str) and streams != "*": 656 streams = [streams] # Ensure streams is a list 657 658 source.read( 659 cache=cache, 660 streams=streams, 661 ) 662 del cache # Ensure the cache is closed properly 663 664 summary: str = f"Sync completed for '{source_connector_name}'!\n\n" 665 summary += "Data written to default DuckDB cache\n" 666 return summary 667 668 669class CachedDatasetInfo(BaseModel): 670 """Class to hold information about a cached dataset.""" 671 672 stream_name: str 673 """The name of the stream in the cache.""" 674 table_name: str 675 schema_name: str | None = None 676 677 678@mcp_tool( 679 read_only=True, 680 idempotent=True, 681 extra_help_text=_CONFIG_HELP, 682) 683def list_cached_streams() -> list[CachedDatasetInfo]: 684 """List all streams available in the default DuckDB cache.""" 685 cache: DuckDBCache = get_default_cache() 686 result = [ 687 CachedDatasetInfo( 688 stream_name=stream_name, 689 table_name=(cache.table_prefix or "") + stream_name, 690 schema_name=cache.schema_name, 691 ) 692 for stream_name in cache.streams 693 ] 694 del cache # Ensure the cache is closed properly 695 return result 696 697 698@mcp_tool( 699 read_only=True, 700 idempotent=True, 701 extra_help_text=_CONFIG_HELP, 702) 703def describe_default_cache() -> dict[str, Any]: 704 """Describe the currently configured default cache.""" 705 cache = get_default_cache() 706 return { 707 "cache_type": type(cache).__name__, 708 "cache_dir": str(cache.cache_dir), 709 "cache_db_path": str(Path(cache.db_path).absolute()), 710 "cached_streams": list(cache.streams.keys()), 711 } 712 713 714def _is_safe_sql(sql_query: str) -> bool: 715 """Check if a SQL query is safe to execute. 716 717 For security reasons, we only allow read-only operations like SELECT, DESCRIBE, and SHOW. 718 Multi-statement queries (containing semicolons) are also disallowed for security. 719 720 Note: SQLAlchemy will also validate downstream, but this is a first-pass check. 721 722 Args: 723 sql_query: The SQL query to check 724 725 Returns: 726 True if the query is safe to execute, False otherwise 727 """ 728 # Remove leading/trailing whitespace and convert to uppercase for checking 729 normalized_query = sql_query.strip().upper() 730 731 # Disallow multi-statement queries (containing semicolons) 732 # Note: We check the original query to catch semicolons anywhere, including in comments 733 if ";" in sql_query: 734 return False 735 736 # List of allowed SQL statement prefixes (read-only operations) 737 allowed_prefixes = ( 738 "SELECT", 739 "DESCRIBE", 740 "DESC", # Short form of DESCRIBE 741 "SHOW", 742 "EXPLAIN", # Also safe - shows query execution plan 743 ) 744 745 # Check if the query starts with any allowed prefix 746 return any(normalized_query.startswith(prefix) for prefix in allowed_prefixes) 747 748 749@mcp_tool( 750 read_only=True, 751 idempotent=True, 752 extra_help_text=_CONFIG_HELP, 753) 754def run_sql_query( 755 sql_query: Annotated[ 756 str, 757 Field(description="The SQL query to execute."), 758 ], 759 max_records: Annotated[ 760 int, 761 Field( 762 description="Maximum number of records to return.", 763 default=1000, 764 ), 765 ], 766) -> list[dict[str, Any]]: 767 """Run a SQL query against the default cache. 768 769 The dialect of SQL should match the dialect of the default cache. 770 Use `describe_default_cache` to see the cache type. 771 772 For DuckDB-type caches: 773 - Use `SHOW TABLES` to list all tables. 774 - Use `DESCRIBE <table_name>` to get the schema of a specific table 775 776 For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN. 777 """ 778 # Check if the query is safe to execute 779 if not _is_safe_sql(sql_query): 780 return [ 781 { 782 "ERROR": "Unsafe SQL query detected. Only read-only operations are allowed: " 783 "SELECT, DESCRIBE, SHOW, EXPLAIN", 784 "SQL_QUERY": sql_query, 785 } 786 ] 787 788 cache: DuckDBCache = get_default_cache() 789 try: 790 return cache.run_sql_query( 791 sql_query, 792 max_records=max_records, 793 ) 794 except Exception as ex: 795 tb_str = traceback.format_exc() 796 return [ 797 { 798 "ERROR": f"Error running SQL query: {ex!r}, {ex!s}", 799 "TRACEBACK": tb_str, 800 "SQL_QUERY": sql_query, 801 } 802 ] 803 finally: 804 del cache # Ensure the cache is closed properly 805 806 807def register_local_tools(app: FastMCP) -> None: 808 """Register local tools with the FastMCP app. 809 810 Args: 811 app: FastMCP application instance 812 """ 813 register_mcp_tools(app, mcp_module=__name__)
109@mcp_tool( 110 read_only=True, 111 idempotent=True, 112 extra_help_text=_CONFIG_HELP, 113) 114def validate_connector_config( 115 connector_name: Annotated[ 116 str, 117 Field(description="The name of the connector to validate."), 118 ], 119 config: Annotated[ 120 dict | str | None, 121 Field( 122 description="The configuration for the connector as a dict object or JSON string.", 123 default=None, 124 ), 125 ], 126 config_file: Annotated[ 127 str | Path | None, 128 Field( 129 description="Path to a YAML or JSON file containing the connector configuration.", 130 default=None, 131 ), 132 ], 133 config_secret_name: Annotated[ 134 str | None, 135 Field( 136 description="The name of the secret containing the configuration.", 137 default=None, 138 ), 139 ], 140 override_execution_mode: Annotated[ 141 Literal["docker", "python", "yaml", "auto"], 142 Field( 143 description="Optionally override the execution method to use for the connector. " 144 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 145 default="auto", 146 ), 147 ], 148 manifest_path: Annotated[ 149 str | Path | None, 150 Field( 151 description="Path to a local YAML manifest file for declarative connectors.", 152 default=None, 153 ), 154 ], 155) -> tuple[bool, str]: 156 """Validate a connector configuration. 157 158 Returns a tuple of (is_valid: bool, message: str). 159 """ 160 try: 161 source: Source = _get_mcp_source( 162 connector_name, 163 override_execution_mode=override_execution_mode, 164 manifest_path=manifest_path, 165 ) 166 except Exception as ex: 167 return False, f"Failed to get connector '{connector_name}': {ex}" 168 169 try: 170 config_dict = resolve_connector_config( 171 config=config, 172 config_file=config_file, 173 config_secret_name=config_secret_name, 174 config_spec_jsonschema=source.config_spec, 175 ) 176 source.set_config(config_dict) 177 except Exception as ex: 178 return False, f"Failed to resolve configuration for {connector_name}: {ex}" 179 180 try: 181 source.check() 182 except Exception as ex: 183 return False, f"Configuration for {connector_name} is invalid: {ex}" 184 185 return True, f"Configuration for {connector_name} is valid!"
Validate a connector configuration.
Returns a tuple of (is_valid: bool, message: str).
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
188@mcp_tool( 189 read_only=True, 190 idempotent=True, 191) 192def list_connector_config_secrets( 193 connector_name: Annotated[ 194 str, 195 Field(description="The name of the connector."), 196 ], 197) -> list[str]: 198 """List all `config_secret_name` options that are known for the given connector. 199 200 This can be used to find out which already-created config secret names are available 201 for a given connector. The return value is a list of secret names, but it will not 202 return the actual secret values. 203 """ 204 secrets_names: list[str] = [] 205 for secrets_mgr in _get_secret_sources(): 206 if isinstance(secrets_mgr, GoogleGSMSecretManager): 207 secrets_names.extend( 208 [ 209 secret_handle.secret_name.split("/")[-1] 210 for secret_handle in secrets_mgr.fetch_connector_secrets(connector_name) 211 ] 212 ) 213 214 return secrets_names
List all config_secret_name options that are known for the given connector.
This can be used to find out which already-created config secret names are available for a given connector. The return value is a list of secret names, but it will not return the actual secret values.
217@mcp_tool( 218 read_only=True, 219 idempotent=True, 220 extra_help_text=_CONFIG_HELP, 221) 222def list_dotenv_secrets() -> dict[str, list[str]]: 223 """List all environment variable names declared within declared .env files. 224 225 This returns a dictionary mapping the .env file name to a list of environment 226 variable names. The values of the environment variables are not returned. 227 """ 228 result: dict[str, list[str]] = {} 229 for secrets_mgr in _get_secret_sources(): 230 if isinstance(secrets_mgr, DotenvSecretManager) and secrets_mgr.dotenv_path: 231 result[str(secrets_mgr.dotenv_path.resolve())] = secrets_mgr.list_secrets_names() 232 233 return result
List all environment variable names declared within declared .env files.
This returns a dictionary mapping the .env file name to a list of environment
variable names. The values of the environment variables are not returned.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
236@mcp_tool( 237 read_only=True, 238 idempotent=True, 239 extra_help_text=_CONFIG_HELP, 240) 241def list_source_streams( 242 source_connector_name: Annotated[ 243 str, 244 Field(description="The name of the source connector."), 245 ], 246 config: Annotated[ 247 dict | str | None, 248 Field( 249 description="The configuration for the source connector as a dict or JSON string.", 250 default=None, 251 ), 252 ], 253 config_file: Annotated[ 254 str | Path | None, 255 Field( 256 description="Path to a YAML or JSON file containing the source connector config.", 257 default=None, 258 ), 259 ], 260 config_secret_name: Annotated[ 261 str | None, 262 Field( 263 description="The name of the secret containing the configuration.", 264 default=None, 265 ), 266 ], 267 override_execution_mode: Annotated[ 268 Literal["docker", "python", "yaml", "auto"], 269 Field( 270 description="Optionally override the execution method to use for the connector. " 271 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 272 default="auto", 273 ), 274 ], 275 manifest_path: Annotated[ 276 str | Path | None, 277 Field( 278 description="Path to a local YAML manifest file for declarative connectors.", 279 default=None, 280 ), 281 ], 282) -> list[str]: 283 """List all streams available in a source connector. 284 285 This operation (generally) requires a valid configuration, including any required secrets. 286 """ 287 source: Source = _get_mcp_source( 288 connector_name=source_connector_name, 289 override_execution_mode=override_execution_mode, 290 manifest_path=manifest_path, 291 ) 292 config_dict = resolve_connector_config( 293 config=config, 294 config_file=config_file, 295 config_secret_name=config_secret_name, 296 config_spec_jsonschema=source.config_spec, 297 ) 298 source.set_config(config_dict) 299 return source.get_available_streams()
List all streams available in a source connector.
This operation (generally) requires a valid configuration, including any required secrets.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
302@mcp_tool( 303 read_only=True, 304 idempotent=True, 305 extra_help_text=_CONFIG_HELP, 306) 307def get_source_stream_json_schema( 308 source_connector_name: Annotated[ 309 str, 310 Field(description="The name of the source connector."), 311 ], 312 stream_name: Annotated[ 313 str, 314 Field(description="The name of the stream."), 315 ], 316 config: Annotated[ 317 dict | str | None, 318 Field( 319 description="The configuration for the source connector as a dict or JSON string.", 320 default=None, 321 ), 322 ], 323 config_file: Annotated[ 324 str | Path | None, 325 Field( 326 description="Path to a YAML or JSON file containing the source connector config.", 327 default=None, 328 ), 329 ], 330 config_secret_name: Annotated[ 331 str | None, 332 Field( 333 description="The name of the secret containing the configuration.", 334 default=None, 335 ), 336 ], 337 override_execution_mode: Annotated[ 338 Literal["docker", "python", "yaml", "auto"], 339 Field( 340 description="Optionally override the execution method to use for the connector. " 341 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 342 default="auto", 343 ), 344 ], 345 manifest_path: Annotated[ 346 str | Path | None, 347 Field( 348 description="Path to a local YAML manifest file for declarative connectors.", 349 default=None, 350 ), 351 ], 352) -> dict[str, Any]: 353 """List all properties for a specific stream in a source connector.""" 354 source: Source = _get_mcp_source( 355 connector_name=source_connector_name, 356 override_execution_mode=override_execution_mode, 357 manifest_path=manifest_path, 358 ) 359 config_dict = resolve_connector_config( 360 config=config, 361 config_file=config_file, 362 config_secret_name=config_secret_name, 363 config_spec_jsonschema=source.config_spec, 364 ) 365 source.set_config(config_dict) 366 return source.get_stream_json_schema(stream_name=stream_name)
List all properties for a specific stream in a source connector.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
369@mcp_tool( 370 read_only=True, 371 extra_help_text=_CONFIG_HELP, 372) 373def read_source_stream_records( 374 source_connector_name: Annotated[ 375 str, 376 Field(description="The name of the source connector."), 377 ], 378 config: Annotated[ 379 dict | str | None, 380 Field( 381 description="The configuration for the source connector as a dict or JSON string.", 382 default=None, 383 ), 384 ], 385 config_file: Annotated[ 386 str | Path | None, 387 Field( 388 description="Path to a YAML or JSON file containing the source connector config.", 389 default=None, 390 ), 391 ], 392 config_secret_name: Annotated[ 393 str | None, 394 Field( 395 description="The name of the secret containing the configuration.", 396 default=None, 397 ), 398 ], 399 *, 400 stream_name: Annotated[ 401 str, 402 Field(description="The name of the stream to read records from."), 403 ], 404 max_records: Annotated[ 405 int, 406 Field( 407 description="The maximum number of records to read.", 408 default=1000, 409 ), 410 ], 411 override_execution_mode: Annotated[ 412 Literal["docker", "python", "yaml", "auto"], 413 Field( 414 description="Optionally override the execution method to use for the connector. " 415 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 416 default="auto", 417 ), 418 ], 419 manifest_path: Annotated[ 420 str | Path | None, 421 Field( 422 description="Path to a local YAML manifest file for declarative connectors.", 423 default=None, 424 ), 425 ], 426) -> list[dict[str, Any]] | str: 427 """Get records from a source connector.""" 428 try: 429 source: Source = _get_mcp_source( 430 connector_name=source_connector_name, 431 override_execution_mode=override_execution_mode, 432 manifest_path=manifest_path, 433 ) 434 config_dict = resolve_connector_config( 435 config=config, 436 config_file=config_file, 437 config_secret_name=config_secret_name, 438 config_spec_jsonschema=source.config_spec, 439 ) 440 source.set_config(config_dict) 441 # First we get a generator for the records in the specified stream. 442 record_generator = source.get_records(stream_name) 443 # Next we load a limited number of records from the generator into our list. 444 records: list[dict[str, Any]] = list(islice(record_generator, max_records)) 445 446 print(f"Retrieved {len(records)} records from stream '{stream_name}'", sys.stderr) 447 448 except Exception as ex: 449 tb_str = traceback.format_exc() 450 # If any error occurs, we print the error message to stderr and return an empty list. 451 return ( 452 f"Error reading records from source '{source_connector_name}': {ex!r}, {ex!s}\n{tb_str}" 453 ) 454 455 else: 456 return records
Get records from a source connector.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
459@mcp_tool( 460 read_only=True, 461 extra_help_text=_CONFIG_HELP, 462) 463def get_stream_previews( 464 source_name: Annotated[ 465 str, 466 Field(description="The name of the source connector."), 467 ], 468 config: Annotated[ 469 dict | str | None, 470 Field( 471 description="The configuration for the source connector as a dict or JSON string.", 472 default=None, 473 ), 474 ], 475 config_file: Annotated[ 476 str | Path | None, 477 Field( 478 description="Path to a YAML or JSON file containing the source connector config.", 479 default=None, 480 ), 481 ], 482 config_secret_name: Annotated[ 483 str | None, 484 Field( 485 description="The name of the secret containing the configuration.", 486 default=None, 487 ), 488 ], 489 streams: Annotated[ 490 list[str] | str | None, 491 Field( 492 description=( 493 "The streams to get previews for. " 494 "Use '*' for all streams, or None for selected streams." 495 ), 496 default=None, 497 ), 498 ], 499 limit: Annotated[ 500 int, 501 Field( 502 description="The maximum number of sample records to return per stream.", 503 default=10, 504 ), 505 ], 506 override_execution_mode: Annotated[ 507 Literal["docker", "python", "yaml", "auto"], 508 Field( 509 description="Optionally override the execution method to use for the connector. " 510 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 511 default="auto", 512 ), 513 ], 514 manifest_path: Annotated[ 515 str | Path | None, 516 Field( 517 description="Path to a local YAML manifest file for declarative connectors.", 518 default=None, 519 ), 520 ], 521) -> dict[str, list[dict[str, Any]] | str]: 522 """Get sample records (previews) from streams in a source connector. 523 524 This operation requires a valid configuration, including any required secrets. 525 Returns a dictionary mapping stream names to lists of sample records, or an error 526 message string if an error occurred for that stream. 527 """ 528 source: Source = _get_mcp_source( 529 connector_name=source_name, 530 override_execution_mode=override_execution_mode, 531 manifest_path=manifest_path, 532 ) 533 534 config_dict = resolve_connector_config( 535 config=config, 536 config_file=config_file, 537 config_secret_name=config_secret_name, 538 config_spec_jsonschema=source.config_spec, 539 ) 540 source.set_config(config_dict) 541 542 streams_param: list[str] | Literal["*"] | None = resolve_list_of_strings( 543 streams 544 ) # pyrefly: ignore[no-matching-overload] 545 if streams_param and len(streams_param) == 1 and streams_param[0] == "*": 546 streams_param = "*" 547 548 try: 549 samples_result = source.get_samples( 550 streams=streams_param, 551 limit=limit, 552 on_error="ignore", 553 ) 554 except Exception as ex: 555 tb_str = traceback.format_exc() 556 return { 557 "ERROR": f"Error getting stream previews from source '{source_name}': " 558 f"{ex!r}, {ex!s}\n{tb_str}" 559 } 560 561 result: dict[str, list[dict[str, Any]] | str] = {} 562 for stream_name, dataset in samples_result.items(): 563 if dataset is None: 564 result[stream_name] = f"Could not retrieve stream samples for stream '{stream_name}'" 565 else: 566 result[stream_name] = list(dataset) 567 568 return result
Get sample records (previews) from streams in a source connector.
This operation requires a valid configuration, including any required secrets.
Returns a dictionary mapping stream names to lists of sample records, or an error
message string if an error occurred for that stream.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
571@mcp_tool( 572 destructive=False, 573 extra_help_text=_CONFIG_HELP, 574) 575def sync_source_to_cache( 576 source_connector_name: Annotated[ 577 str, 578 Field(description="The name of the source connector."), 579 ], 580 config: Annotated[ 581 dict | str | None, 582 Field( 583 description="The configuration for the source connector as a dict or JSON string.", 584 default=None, 585 ), 586 ], 587 config_file: Annotated[ 588 str | Path | None, 589 Field( 590 description="Path to a YAML or JSON file containing the source connector config.", 591 default=None, 592 ), 593 ], 594 config_secret_name: Annotated[ 595 str | None, 596 Field( 597 description="The name of the secret containing the configuration.", 598 default=None, 599 ), 600 ], 601 streams: Annotated[ 602 list[str] | str, 603 Field( 604 description="The streams to sync.", 605 default="suggested", 606 ), 607 ], 608 override_execution_mode: Annotated[ 609 Literal["docker", "python", "yaml", "auto"], 610 Field( 611 description="Optionally override the execution method to use for the connector. " 612 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 613 default="auto", 614 ), 615 ], 616 manifest_path: Annotated[ 617 str | Path | None, 618 Field( 619 description="Path to a local YAML manifest file for declarative connectors.", 620 default=None, 621 ), 622 ], 623) -> str: 624 """Run a sync from a source connector to the default DuckDB cache.""" 625 source: Source = _get_mcp_source( 626 connector_name=source_connector_name, 627 override_execution_mode=override_execution_mode, 628 manifest_path=manifest_path, 629 ) 630 config_dict = resolve_connector_config( 631 config=config, 632 config_file=config_file, 633 config_secret_name=config_secret_name, 634 config_spec_jsonschema=source.config_spec, 635 ) 636 source.set_config(config_dict) 637 cache = get_default_cache() 638 639 streams = resolve_list_of_strings(streams) 640 if streams and len(streams) == 1 and streams[0] in {"*", "suggested"}: 641 # Float '*' and 'suggested' to the top-level for special processing: 642 streams = streams[0] 643 644 if isinstance(streams, str) and streams == "suggested": 645 streams = "*" # Default to all streams if 'suggested' is not otherwise specified. 646 try: 647 metadata = get_connector_metadata( 648 source_connector_name, 649 ) 650 except Exception: 651 streams = "*" # Fallback to all streams if suggested streams fail. 652 else: 653 if metadata is not None: 654 streams = metadata.suggested_streams or "*" 655 656 if isinstance(streams, str) and streams != "*": 657 streams = [streams] # Ensure streams is a list 658 659 source.read( 660 cache=cache, 661 streams=streams, 662 ) 663 del cache # Ensure the cache is closed properly 664 665 summary: str = f"Sync completed for '{source_connector_name}'!\n\n" 666 summary += "Data written to default DuckDB cache\n" 667 return summary
Run a sync from a source connector to the default DuckDB cache.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
670class CachedDatasetInfo(BaseModel): 671 """Class to hold information about a cached dataset.""" 672 673 stream_name: str 674 """The name of the stream in the cache.""" 675 table_name: str 676 schema_name: str | None = None
Class to hold information about a cached dataset.
679@mcp_tool( 680 read_only=True, 681 idempotent=True, 682 extra_help_text=_CONFIG_HELP, 683) 684def list_cached_streams() -> list[CachedDatasetInfo]: 685 """List all streams available in the default DuckDB cache.""" 686 cache: DuckDBCache = get_default_cache() 687 result = [ 688 CachedDatasetInfo( 689 stream_name=stream_name, 690 table_name=(cache.table_prefix or "") + stream_name, 691 schema_name=cache.schema_name, 692 ) 693 for stream_name in cache.streams 694 ] 695 del cache # Ensure the cache is closed properly 696 return result
List all streams available in the default DuckDB cache.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
699@mcp_tool( 700 read_only=True, 701 idempotent=True, 702 extra_help_text=_CONFIG_HELP, 703) 704def describe_default_cache() -> dict[str, Any]: 705 """Describe the currently configured default cache.""" 706 cache = get_default_cache() 707 return { 708 "cache_type": type(cache).__name__, 709 "cache_dir": str(cache.cache_dir), 710 "cache_db_path": str(Path(cache.db_path).absolute()), 711 "cached_streams": list(cache.streams.keys()), 712 }
Describe the currently configured default cache.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
750@mcp_tool( 751 read_only=True, 752 idempotent=True, 753 extra_help_text=_CONFIG_HELP, 754) 755def run_sql_query( 756 sql_query: Annotated[ 757 str, 758 Field(description="The SQL query to execute."), 759 ], 760 max_records: Annotated[ 761 int, 762 Field( 763 description="Maximum number of records to return.", 764 default=1000, 765 ), 766 ], 767) -> list[dict[str, Any]]: 768 """Run a SQL query against the default cache. 769 770 The dialect of SQL should match the dialect of the default cache. 771 Use `describe_default_cache` to see the cache type. 772 773 For DuckDB-type caches: 774 - Use `SHOW TABLES` to list all tables. 775 - Use `DESCRIBE <table_name>` to get the schema of a specific table 776 777 For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN. 778 """ 779 # Check if the query is safe to execute 780 if not _is_safe_sql(sql_query): 781 return [ 782 { 783 "ERROR": "Unsafe SQL query detected. Only read-only operations are allowed: " 784 "SELECT, DESCRIBE, SHOW, EXPLAIN", 785 "SQL_QUERY": sql_query, 786 } 787 ] 788 789 cache: DuckDBCache = get_default_cache() 790 try: 791 return cache.run_sql_query( 792 sql_query, 793 max_records=max_records, 794 ) 795 except Exception as ex: 796 tb_str = traceback.format_exc() 797 return [ 798 { 799 "ERROR": f"Error running SQL query: {ex!r}, {ex!s}", 800 "TRACEBACK": tb_str, 801 "SQL_QUERY": sql_query, 802 } 803 ] 804 finally: 805 del cache # Ensure the cache is closed properly
Run a SQL query against the default cache.
The dialect of SQL should match the dialect of the default cache.
Use `describe_default_cache` to see the cache type.
For DuckDB-type caches:
- Use `SHOW TABLES` to list all tables.
- Use `DESCRIBE <table_name>` to get the schema of a specific table
For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
808def register_local_tools(app: FastMCP) -> None: 809 """Register local tools with the FastMCP app. 810 811 Args: 812 app: FastMCP application instance 813 """ 814 register_mcp_tools(app, mcp_module=__name__)
Register local tools with the FastMCP app.
Arguments:
- app: FastMCP application instance