airbyte.mcp.local_ops
Local MCP operations.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Local MCP operations.""" 3 4import sys 5import traceback 6from itertools import islice 7from pathlib import Path 8from typing import TYPE_CHECKING, Annotated, Any, Literal 9 10from fastmcp import FastMCP 11from pydantic import BaseModel, Field 12 13from airbyte import get_source 14from airbyte._util.meta import is_docker_installed 15from airbyte.caches.util import get_default_cache 16from airbyte.mcp._tool_utils import mcp_tool, register_tools 17from airbyte.mcp._util import resolve_config, resolve_list_of_strings 18from airbyte.secrets.config import _get_secret_sources 19from airbyte.secrets.env_vars import DotenvSecretManager 20from airbyte.secrets.google_gsm import GoogleGSMSecretManager 21from airbyte.sources.base import Source 22from airbyte.sources.registry import get_connector_metadata 23 24 25if TYPE_CHECKING: 26 from airbyte.caches.duckdb import DuckDBCache 27 28 29_CONFIG_HELP = """ 30You can provide `config` as JSON or a Path to a YAML/JSON file. 31If a `dict` is provided, it must not contain hardcoded secrets. 32Instead, secrets should be provided using environment variables, 33and the config should reference them using the format 34`secret_reference::ENV_VAR_NAME`. 35 36You can also provide a `config_secret_name` to use a specific 37secret name for the configuration. This is useful if you want to 38validate a configuration that is stored in a secrets manager. 39 40If `config_secret_name` is provided, it should point to a string 41that contains valid JSON or YAML. 42 43If both `config` and `config_secret_name` are provided, the 44`config` will be loaded first and then the referenced secret config 45will be layered on top of the non-secret config. 46 47For declarative connectors, you can provide a `manifest_path` to 48specify a local YAML manifest file instead of using the registry 49version. This is useful for testing custom or locally-developed 50connector manifests. 51""" 52 53 54def _get_mcp_source( 55 connector_name: str, 56 override_execution_mode: Literal["auto", "docker", "python", "yaml"] = "auto", 57 *, 58 install_if_missing: bool = True, 59 manifest_path: str | Path | None, 60) -> Source: 61 """Get the MCP source for a connector.""" 62 if manifest_path: 63 override_execution_mode = "yaml" 64 elif override_execution_mode == "auto" and is_docker_installed(): 65 override_execution_mode = "docker" 66 67 source: Source 68 if override_execution_mode == "auto": 69 # Use defaults with no overrides 70 source = get_source( 71 connector_name, 72 install_if_missing=False, 73 source_manifest=manifest_path or None, 74 ) 75 elif override_execution_mode == "python": 76 source = get_source( 77 connector_name, 78 use_python=True, 79 install_if_missing=False, 80 source_manifest=manifest_path or None, 81 ) 82 elif override_execution_mode == "docker": 83 source = get_source( 84 connector_name, 85 docker_image=True, 86 install_if_missing=False, 87 source_manifest=manifest_path or None, 88 ) 89 elif override_execution_mode == "yaml": 90 source = get_source( 91 connector_name, 92 source_manifest=manifest_path or True, 93 install_if_missing=False, 94 ) 95 else: 96 raise ValueError( 97 f"Unknown execution method: {override_execution_mode}. " 98 "Expected one of: ['auto', 'docker', 'python', 'yaml']." 99 ) 100 101 # Ensure installed: 102 if install_if_missing: 103 source.executor.ensure_installation() 104 105 return source 106 107 108@mcp_tool( 109 domain="local", 110 read_only=True, 111 idempotent=True, 112 extra_help_text=_CONFIG_HELP, 113) 114def validate_connector_config( 115 connector_name: Annotated[ 116 str, 117 Field(description="The name of the connector to validate."), 118 ], 119 config: Annotated[ 120 dict | str | None, 121 Field( 122 description="The configuration for the connector as a dict object or JSON string.", 123 default=None, 124 ), 125 ], 126 config_file: Annotated[ 127 str | Path | None, 128 Field( 129 description="Path to a YAML or JSON file containing the connector configuration.", 130 default=None, 131 ), 132 ], 133 config_secret_name: Annotated[ 134 str | None, 135 Field( 136 description="The name of the secret containing the configuration.", 137 default=None, 138 ), 139 ], 140 override_execution_mode: Annotated[ 141 Literal["docker", "python", "yaml", "auto"], 142 Field( 143 description="Optionally override the execution method to use for the connector. " 144 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 145 default="auto", 146 ), 147 ], 148 manifest_path: Annotated[ 149 str | Path | None, 150 Field( 151 description="Path to a local YAML manifest file for declarative connectors.", 152 default=None, 153 ), 154 ], 155) -> tuple[bool, str]: 156 """Validate a connector configuration. 157 158 Returns a tuple of (is_valid: bool, message: str). 159 """ 160 try: 161 source: Source = _get_mcp_source( 162 connector_name, 163 override_execution_mode=override_execution_mode, 164 manifest_path=manifest_path, 165 ) 166 except Exception as ex: 167 return False, f"Failed to get connector '{connector_name}': {ex}" 168 169 try: 170 config_dict = resolve_config( 171 config=config, 172 config_file=config_file, 173 config_secret_name=config_secret_name, 174 config_spec_jsonschema=source.config_spec, 175 ) 176 source.set_config(config_dict) 177 except Exception as ex: 178 return False, f"Failed to resolve configuration for {connector_name}: {ex}" 179 180 try: 181 source.check() 182 except Exception as ex: 183 return False, f"Configuration for {connector_name} is invalid: {ex}" 184 185 return True, f"Configuration for {connector_name} is valid!" 186 187 188@mcp_tool( 189 domain="local", 190 read_only=True, 191 idempotent=True, 192) 193def list_connector_config_secrets( 194 connector_name: Annotated[ 195 str, 196 Field(description="The name of the connector."), 197 ], 198) -> list[str]: 199 """List all `config_secret_name` options that are known for the given connector. 200 201 This can be used to find out which already-created config secret names are available 202 for a given connector. The return value is a list of secret names, but it will not 203 return the actual secret values. 204 """ 205 secrets_names: list[str] = [] 206 for secrets_mgr in _get_secret_sources(): 207 if isinstance(secrets_mgr, GoogleGSMSecretManager): 208 secrets_names.extend( 209 [ 210 secret_handle.secret_name.split("/")[-1] 211 for secret_handle in secrets_mgr.fetch_connector_secrets(connector_name) 212 ] 213 ) 214 215 return secrets_names 216 217 218@mcp_tool( 219 domain="local", 220 read_only=True, 221 idempotent=True, 222 extra_help_text=_CONFIG_HELP, 223) 224def list_dotenv_secrets() -> dict[str, list[str]]: 225 """List all environment variable names declared within declared .env files. 226 227 This returns a dictionary mapping the .env file name to a list of environment 228 variable names. The values of the environment variables are not returned. 229 """ 230 result: dict[str, list[str]] = {} 231 for secrets_mgr in _get_secret_sources(): 232 if isinstance(secrets_mgr, DotenvSecretManager) and secrets_mgr.dotenv_path: 233 result[str(secrets_mgr.dotenv_path.resolve())] = secrets_mgr.list_secrets_names() 234 235 return result 236 237 238@mcp_tool( 239 domain="local", 240 read_only=True, 241 idempotent=True, 242 extra_help_text=_CONFIG_HELP, 243) 244def list_source_streams( 245 source_connector_name: Annotated[ 246 str, 247 Field(description="The name of the source connector."), 248 ], 249 config: Annotated[ 250 dict | str | None, 251 Field( 252 description="The configuration for the source connector as a dict or JSON string.", 253 default=None, 254 ), 255 ], 256 config_file: Annotated[ 257 str | Path | None, 258 Field( 259 description="Path to a YAML or JSON file containing the source connector config.", 260 default=None, 261 ), 262 ], 263 config_secret_name: Annotated[ 264 str | None, 265 Field( 266 description="The name of the secret containing the configuration.", 267 default=None, 268 ), 269 ], 270 override_execution_mode: Annotated[ 271 Literal["docker", "python", "yaml", "auto"], 272 Field( 273 description="Optionally override the execution method to use for the connector. " 274 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 275 default="auto", 276 ), 277 ], 278 manifest_path: Annotated[ 279 str | Path | None, 280 Field( 281 description="Path to a local YAML manifest file for declarative connectors.", 282 default=None, 283 ), 284 ], 285) -> list[str]: 286 """List all streams available in a source connector. 287 288 This operation (generally) requires a valid configuration, including any required secrets. 289 """ 290 source: Source = _get_mcp_source( 291 connector_name=source_connector_name, 292 override_execution_mode=override_execution_mode, 293 manifest_path=manifest_path, 294 ) 295 config_dict = resolve_config( 296 config=config, 297 config_file=config_file, 298 config_secret_name=config_secret_name, 299 config_spec_jsonschema=source.config_spec, 300 ) 301 source.set_config(config_dict) 302 return source.get_available_streams() 303 304 305@mcp_tool( 306 domain="local", 307 read_only=True, 308 idempotent=True, 309 extra_help_text=_CONFIG_HELP, 310) 311def get_source_stream_json_schema( 312 source_connector_name: Annotated[ 313 str, 314 Field(description="The name of the source connector."), 315 ], 316 stream_name: Annotated[ 317 str, 318 Field(description="The name of the stream."), 319 ], 320 config: Annotated[ 321 dict | str | None, 322 Field( 323 description="The configuration for the source connector as a dict or JSON string.", 324 default=None, 325 ), 326 ], 327 config_file: Annotated[ 328 str | Path | None, 329 Field( 330 description="Path to a YAML or JSON file containing the source connector config.", 331 default=None, 332 ), 333 ], 334 config_secret_name: Annotated[ 335 str | None, 336 Field( 337 description="The name of the secret containing the configuration.", 338 default=None, 339 ), 340 ], 341 override_execution_mode: Annotated[ 342 Literal["docker", "python", "yaml", "auto"], 343 Field( 344 description="Optionally override the execution method to use for the connector. " 345 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 346 default="auto", 347 ), 348 ], 349 manifest_path: Annotated[ 350 str | Path | None, 351 Field( 352 description="Path to a local YAML manifest file for declarative connectors.", 353 default=None, 354 ), 355 ], 356) -> dict[str, Any]: 357 """List all properties for a specific stream in a source connector.""" 358 source: Source = _get_mcp_source( 359 connector_name=source_connector_name, 360 override_execution_mode=override_execution_mode, 361 manifest_path=manifest_path, 362 ) 363 config_dict = resolve_config( 364 config=config, 365 config_file=config_file, 366 config_secret_name=config_secret_name, 367 config_spec_jsonschema=source.config_spec, 368 ) 369 source.set_config(config_dict) 370 return source.get_stream_json_schema(stream_name=stream_name) 371 372 373@mcp_tool( 374 domain="local", 375 read_only=True, 376 extra_help_text=_CONFIG_HELP, 377) 378def read_source_stream_records( 379 source_connector_name: Annotated[ 380 str, 381 Field(description="The name of the source connector."), 382 ], 383 config: Annotated[ 384 dict | str | None, 385 Field( 386 description="The configuration for the source connector as a dict or JSON string.", 387 default=None, 388 ), 389 ], 390 config_file: Annotated[ 391 str | Path | None, 392 Field( 393 description="Path to a YAML or JSON file containing the source connector config.", 394 default=None, 395 ), 396 ], 397 config_secret_name: Annotated[ 398 str | None, 399 Field( 400 description="The name of the secret containing the configuration.", 401 default=None, 402 ), 403 ], 404 *, 405 stream_name: Annotated[ 406 str, 407 Field(description="The name of the stream to read records from."), 408 ], 409 max_records: Annotated[ 410 int, 411 Field( 412 description="The maximum number of records to read.", 413 default=1000, 414 ), 415 ], 416 override_execution_mode: Annotated[ 417 Literal["docker", "python", "yaml", "auto"], 418 Field( 419 description="Optionally override the execution method to use for the connector. " 420 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 421 default="auto", 422 ), 423 ], 424 manifest_path: Annotated[ 425 str | Path | None, 426 Field( 427 description="Path to a local YAML manifest file for declarative connectors.", 428 default=None, 429 ), 430 ], 431) -> list[dict[str, Any]] | str: 432 """Get records from a source connector.""" 433 try: 434 source: Source = _get_mcp_source( 435 connector_name=source_connector_name, 436 override_execution_mode=override_execution_mode, 437 manifest_path=manifest_path, 438 ) 439 config_dict = resolve_config( 440 config=config, 441 config_file=config_file, 442 config_secret_name=config_secret_name, 443 config_spec_jsonschema=source.config_spec, 444 ) 445 source.set_config(config_dict) 446 # First we get a generator for the records in the specified stream. 447 record_generator = source.get_records(stream_name) 448 # Next we load a limited number of records from the generator into our list. 449 records: list[dict[str, Any]] = list(islice(record_generator, max_records)) 450 451 print(f"Retrieved {len(records)} records from stream '{stream_name}'", sys.stderr) 452 453 except Exception as ex: 454 tb_str = traceback.format_exc() 455 # If any error occurs, we print the error message to stderr and return an empty list. 456 return ( 457 f"Error reading records from source '{source_connector_name}': {ex!r}, {ex!s}\n{tb_str}" 458 ) 459 460 else: 461 return records 462 463 464@mcp_tool( 465 domain="local", 466 read_only=True, 467 extra_help_text=_CONFIG_HELP, 468) 469def get_stream_previews( 470 source_name: Annotated[ 471 str, 472 Field(description="The name of the source connector."), 473 ], 474 config: Annotated[ 475 dict | str | None, 476 Field( 477 description="The configuration for the source connector as a dict or JSON string.", 478 default=None, 479 ), 480 ], 481 config_file: Annotated[ 482 str | Path | None, 483 Field( 484 description="Path to a YAML or JSON file containing the source connector config.", 485 default=None, 486 ), 487 ], 488 config_secret_name: Annotated[ 489 str | None, 490 Field( 491 description="The name of the secret containing the configuration.", 492 default=None, 493 ), 494 ], 495 streams: Annotated[ 496 list[str] | str | None, 497 Field( 498 description=( 499 "The streams to get previews for. " 500 "Use '*' for all streams, or None for selected streams." 501 ), 502 default=None, 503 ), 504 ], 505 limit: Annotated[ 506 int, 507 Field( 508 description="The maximum number of sample records to return per stream.", 509 default=10, 510 ), 511 ], 512 override_execution_mode: Annotated[ 513 Literal["docker", "python", "yaml", "auto"], 514 Field( 515 description="Optionally override the execution method to use for the connector. " 516 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 517 default="auto", 518 ), 519 ], 520 manifest_path: Annotated[ 521 str | Path | None, 522 Field( 523 description="Path to a local YAML manifest file for declarative connectors.", 524 default=None, 525 ), 526 ], 527) -> dict[str, list[dict[str, Any]] | str]: 528 """Get sample records (previews) from streams in a source connector. 529 530 This operation requires a valid configuration, including any required secrets. 531 Returns a dictionary mapping stream names to lists of sample records, or an error 532 message string if an error occurred for that stream. 533 """ 534 source: Source = _get_mcp_source( 535 connector_name=source_name, 536 override_execution_mode=override_execution_mode, 537 manifest_path=manifest_path, 538 ) 539 540 config_dict = resolve_config( 541 config=config, 542 config_file=config_file, 543 config_secret_name=config_secret_name, 544 config_spec_jsonschema=source.config_spec, 545 ) 546 source.set_config(config_dict) 547 548 streams_param: list[str] | Literal["*"] | None = resolve_list_of_strings( 549 streams 550 ) # pyrefly: ignore[no-matching-overload] 551 if streams_param and len(streams_param) == 1 and streams_param[0] == "*": 552 streams_param = "*" 553 554 try: 555 samples_result = source.get_samples( 556 streams=streams_param, 557 limit=limit, 558 on_error="ignore", 559 ) 560 except Exception as ex: 561 tb_str = traceback.format_exc() 562 return { 563 "ERROR": f"Error getting stream previews from source '{source_name}': " 564 f"{ex!r}, {ex!s}\n{tb_str}" 565 } 566 567 result: dict[str, list[dict[str, Any]] | str] = {} 568 for stream_name, dataset in samples_result.items(): 569 if dataset is None: 570 result[stream_name] = f"Could not retrieve stream samples for stream '{stream_name}'" 571 else: 572 result[stream_name] = list(dataset) 573 574 return result 575 576 577@mcp_tool( 578 domain="local", 579 destructive=False, 580 extra_help_text=_CONFIG_HELP, 581) 582def sync_source_to_cache( 583 source_connector_name: Annotated[ 584 str, 585 Field(description="The name of the source connector."), 586 ], 587 config: Annotated[ 588 dict | str | None, 589 Field( 590 description="The configuration for the source connector as a dict or JSON string.", 591 default=None, 592 ), 593 ], 594 config_file: Annotated[ 595 str | Path | None, 596 Field( 597 description="Path to a YAML or JSON file containing the source connector config.", 598 default=None, 599 ), 600 ], 601 config_secret_name: Annotated[ 602 str | None, 603 Field( 604 description="The name of the secret containing the configuration.", 605 default=None, 606 ), 607 ], 608 streams: Annotated[ 609 list[str] | str, 610 Field( 611 description="The streams to sync.", 612 default="suggested", 613 ), 614 ], 615 override_execution_mode: Annotated[ 616 Literal["docker", "python", "yaml", "auto"], 617 Field( 618 description="Optionally override the execution method to use for the connector. " 619 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 620 default="auto", 621 ), 622 ], 623 manifest_path: Annotated[ 624 str | Path | None, 625 Field( 626 description="Path to a local YAML manifest file for declarative connectors.", 627 default=None, 628 ), 629 ], 630) -> str: 631 """Run a sync from a source connector to the default DuckDB cache.""" 632 source: Source = _get_mcp_source( 633 connector_name=source_connector_name, 634 override_execution_mode=override_execution_mode, 635 manifest_path=manifest_path, 636 ) 637 config_dict = resolve_config( 638 config=config, 639 config_file=config_file, 640 config_secret_name=config_secret_name, 641 config_spec_jsonschema=source.config_spec, 642 ) 643 source.set_config(config_dict) 644 cache = get_default_cache() 645 646 streams = resolve_list_of_strings(streams) 647 if streams and len(streams) == 1 and streams[0] in {"*", "suggested"}: 648 # Float '*' and 'suggested' to the top-level for special processing: 649 streams = streams[0] 650 651 if isinstance(streams, str) and streams == "suggested": 652 streams = "*" # Default to all streams if 'suggested' is not otherwise specified. 653 try: 654 metadata = get_connector_metadata( 655 source_connector_name, 656 ) 657 except Exception: 658 streams = "*" # Fallback to all streams if suggested streams fail. 659 else: 660 if metadata is not None: 661 streams = metadata.suggested_streams or "*" 662 663 if isinstance(streams, str) and streams != "*": 664 streams = [streams] # Ensure streams is a list 665 666 source.read( 667 cache=cache, 668 streams=streams, 669 ) 670 del cache # Ensure the cache is closed properly 671 672 summary: str = f"Sync completed for '{source_connector_name}'!\n\n" 673 summary += "Data written to default DuckDB cache\n" 674 return summary 675 676 677class CachedDatasetInfo(BaseModel): 678 """Class to hold information about a cached dataset.""" 679 680 stream_name: str 681 """The name of the stream in the cache.""" 682 table_name: str 683 schema_name: str | None = None 684 685 686@mcp_tool( 687 domain="local", 688 read_only=True, 689 idempotent=True, 690 extra_help_text=_CONFIG_HELP, 691) 692def list_cached_streams() -> list[CachedDatasetInfo]: 693 """List all streams available in the default DuckDB cache.""" 694 cache: DuckDBCache = get_default_cache() 695 result = [ 696 CachedDatasetInfo( 697 stream_name=stream_name, 698 table_name=(cache.table_prefix or "") + stream_name, 699 schema_name=cache.schema_name, 700 ) 701 for stream_name in cache.streams 702 ] 703 del cache # Ensure the cache is closed properly 704 return result 705 706 707@mcp_tool( 708 domain="local", 709 read_only=True, 710 idempotent=True, 711 extra_help_text=_CONFIG_HELP, 712) 713def describe_default_cache() -> dict[str, Any]: 714 """Describe the currently configured default cache.""" 715 cache = get_default_cache() 716 return { 717 "cache_type": type(cache).__name__, 718 "cache_dir": str(cache.cache_dir), 719 "cache_db_path": str(Path(cache.db_path).absolute()), 720 "cached_streams": list(cache.streams.keys()), 721 } 722 723 724def _is_safe_sql(sql_query: str) -> bool: 725 """Check if a SQL query is safe to execute. 726 727 For security reasons, we only allow read-only operations like SELECT, DESCRIBE, and SHOW. 728 Multi-statement queries (containing semicolons) are also disallowed for security. 729 730 Note: SQLAlchemy will also validate downstream, but this is a first-pass check. 731 732 Args: 733 sql_query: The SQL query to check 734 735 Returns: 736 True if the query is safe to execute, False otherwise 737 """ 738 # Remove leading/trailing whitespace and convert to uppercase for checking 739 normalized_query = sql_query.strip().upper() 740 741 # Disallow multi-statement queries (containing semicolons) 742 # Note: We check the original query to catch semicolons anywhere, including in comments 743 if ";" in sql_query: 744 return False 745 746 # List of allowed SQL statement prefixes (read-only operations) 747 allowed_prefixes = ( 748 "SELECT", 749 "DESCRIBE", 750 "DESC", # Short form of DESCRIBE 751 "SHOW", 752 "EXPLAIN", # Also safe - shows query execution plan 753 ) 754 755 # Check if the query starts with any allowed prefix 756 return any(normalized_query.startswith(prefix) for prefix in allowed_prefixes) 757 758 759@mcp_tool( 760 domain="local", 761 read_only=True, 762 idempotent=True, 763 extra_help_text=_CONFIG_HELP, 764) 765def run_sql_query( 766 sql_query: Annotated[ 767 str, 768 Field(description="The SQL query to execute."), 769 ], 770 max_records: Annotated[ 771 int, 772 Field( 773 description="Maximum number of records to return.", 774 default=1000, 775 ), 776 ], 777) -> list[dict[str, Any]]: 778 """Run a SQL query against the default cache. 779 780 The dialect of SQL should match the dialect of the default cache. 781 Use `describe_default_cache` to see the cache type. 782 783 For DuckDB-type caches: 784 - Use `SHOW TABLES` to list all tables. 785 - Use `DESCRIBE <table_name>` to get the schema of a specific table 786 787 For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN. 788 """ 789 # Check if the query is safe to execute 790 if not _is_safe_sql(sql_query): 791 return [ 792 { 793 "ERROR": "Unsafe SQL query detected. Only read-only operations are allowed: " 794 "SELECT, DESCRIBE, SHOW, EXPLAIN", 795 "SQL_QUERY": sql_query, 796 } 797 ] 798 799 cache: DuckDBCache = get_default_cache() 800 try: 801 return cache.run_sql_query( 802 sql_query, 803 max_records=max_records, 804 ) 805 except Exception as ex: 806 tb_str = traceback.format_exc() 807 return [ 808 { 809 "ERROR": f"Error running SQL query: {ex!r}, {ex!s}", 810 "TRACEBACK": tb_str, 811 "SQL_QUERY": sql_query, 812 } 813 ] 814 finally: 815 del cache # Ensure the cache is closed properly 816 817 818def register_local_ops_tools(app: FastMCP) -> None: 819 """@private Register tools with the FastMCP app. 820 821 This is an internal function and should not be called directly. 822 """ 823 register_tools(app, domain="local")
109@mcp_tool( 110 domain="local", 111 read_only=True, 112 idempotent=True, 113 extra_help_text=_CONFIG_HELP, 114) 115def validate_connector_config( 116 connector_name: Annotated[ 117 str, 118 Field(description="The name of the connector to validate."), 119 ], 120 config: Annotated[ 121 dict | str | None, 122 Field( 123 description="The configuration for the connector as a dict object or JSON string.", 124 default=None, 125 ), 126 ], 127 config_file: Annotated[ 128 str | Path | None, 129 Field( 130 description="Path to a YAML or JSON file containing the connector configuration.", 131 default=None, 132 ), 133 ], 134 config_secret_name: Annotated[ 135 str | None, 136 Field( 137 description="The name of the secret containing the configuration.", 138 default=None, 139 ), 140 ], 141 override_execution_mode: Annotated[ 142 Literal["docker", "python", "yaml", "auto"], 143 Field( 144 description="Optionally override the execution method to use for the connector. " 145 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 146 default="auto", 147 ), 148 ], 149 manifest_path: Annotated[ 150 str | Path | None, 151 Field( 152 description="Path to a local YAML manifest file for declarative connectors.", 153 default=None, 154 ), 155 ], 156) -> tuple[bool, str]: 157 """Validate a connector configuration. 158 159 Returns a tuple of (is_valid: bool, message: str). 160 """ 161 try: 162 source: Source = _get_mcp_source( 163 connector_name, 164 override_execution_mode=override_execution_mode, 165 manifest_path=manifest_path, 166 ) 167 except Exception as ex: 168 return False, f"Failed to get connector '{connector_name}': {ex}" 169 170 try: 171 config_dict = resolve_config( 172 config=config, 173 config_file=config_file, 174 config_secret_name=config_secret_name, 175 config_spec_jsonschema=source.config_spec, 176 ) 177 source.set_config(config_dict) 178 except Exception as ex: 179 return False, f"Failed to resolve configuration for {connector_name}: {ex}" 180 181 try: 182 source.check() 183 except Exception as ex: 184 return False, f"Configuration for {connector_name} is invalid: {ex}" 185 186 return True, f"Configuration for {connector_name} is valid!"
Validate a connector configuration.
Returns a tuple of (is_valid: bool, message: str).
189@mcp_tool( 190 domain="local", 191 read_only=True, 192 idempotent=True, 193) 194def list_connector_config_secrets( 195 connector_name: Annotated[ 196 str, 197 Field(description="The name of the connector."), 198 ], 199) -> list[str]: 200 """List all `config_secret_name` options that are known for the given connector. 201 202 This can be used to find out which already-created config secret names are available 203 for a given connector. The return value is a list of secret names, but it will not 204 return the actual secret values. 205 """ 206 secrets_names: list[str] = [] 207 for secrets_mgr in _get_secret_sources(): 208 if isinstance(secrets_mgr, GoogleGSMSecretManager): 209 secrets_names.extend( 210 [ 211 secret_handle.secret_name.split("/")[-1] 212 for secret_handle in secrets_mgr.fetch_connector_secrets(connector_name) 213 ] 214 ) 215 216 return secrets_names
List all config_secret_name options that are known for the given connector.
This can be used to find out which already-created config secret names are available for a given connector. The return value is a list of secret names, but it will not return the actual secret values.
219@mcp_tool( 220 domain="local", 221 read_only=True, 222 idempotent=True, 223 extra_help_text=_CONFIG_HELP, 224) 225def list_dotenv_secrets() -> dict[str, list[str]]: 226 """List all environment variable names declared within declared .env files. 227 228 This returns a dictionary mapping the .env file name to a list of environment 229 variable names. The values of the environment variables are not returned. 230 """ 231 result: dict[str, list[str]] = {} 232 for secrets_mgr in _get_secret_sources(): 233 if isinstance(secrets_mgr, DotenvSecretManager) and secrets_mgr.dotenv_path: 234 result[str(secrets_mgr.dotenv_path.resolve())] = secrets_mgr.list_secrets_names() 235 236 return result
List all environment variable names declared within declared .env files.
This returns a dictionary mapping the .env file name to a list of environment variable names. The values of the environment variables are not returned.
239@mcp_tool( 240 domain="local", 241 read_only=True, 242 idempotent=True, 243 extra_help_text=_CONFIG_HELP, 244) 245def list_source_streams( 246 source_connector_name: Annotated[ 247 str, 248 Field(description="The name of the source connector."), 249 ], 250 config: Annotated[ 251 dict | str | None, 252 Field( 253 description="The configuration for the source connector as a dict or JSON string.", 254 default=None, 255 ), 256 ], 257 config_file: Annotated[ 258 str | Path | None, 259 Field( 260 description="Path to a YAML or JSON file containing the source connector config.", 261 default=None, 262 ), 263 ], 264 config_secret_name: Annotated[ 265 str | None, 266 Field( 267 description="The name of the secret containing the configuration.", 268 default=None, 269 ), 270 ], 271 override_execution_mode: Annotated[ 272 Literal["docker", "python", "yaml", "auto"], 273 Field( 274 description="Optionally override the execution method to use for the connector. " 275 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 276 default="auto", 277 ), 278 ], 279 manifest_path: Annotated[ 280 str | Path | None, 281 Field( 282 description="Path to a local YAML manifest file for declarative connectors.", 283 default=None, 284 ), 285 ], 286) -> list[str]: 287 """List all streams available in a source connector. 288 289 This operation (generally) requires a valid configuration, including any required secrets. 290 """ 291 source: Source = _get_mcp_source( 292 connector_name=source_connector_name, 293 override_execution_mode=override_execution_mode, 294 manifest_path=manifest_path, 295 ) 296 config_dict = resolve_config( 297 config=config, 298 config_file=config_file, 299 config_secret_name=config_secret_name, 300 config_spec_jsonschema=source.config_spec, 301 ) 302 source.set_config(config_dict) 303 return source.get_available_streams()
List all streams available in a source connector.
This operation (generally) requires a valid configuration, including any required secrets.
306@mcp_tool( 307 domain="local", 308 read_only=True, 309 idempotent=True, 310 extra_help_text=_CONFIG_HELP, 311) 312def get_source_stream_json_schema( 313 source_connector_name: Annotated[ 314 str, 315 Field(description="The name of the source connector."), 316 ], 317 stream_name: Annotated[ 318 str, 319 Field(description="The name of the stream."), 320 ], 321 config: Annotated[ 322 dict | str | None, 323 Field( 324 description="The configuration for the source connector as a dict or JSON string.", 325 default=None, 326 ), 327 ], 328 config_file: Annotated[ 329 str | Path | None, 330 Field( 331 description="Path to a YAML or JSON file containing the source connector config.", 332 default=None, 333 ), 334 ], 335 config_secret_name: Annotated[ 336 str | None, 337 Field( 338 description="The name of the secret containing the configuration.", 339 default=None, 340 ), 341 ], 342 override_execution_mode: Annotated[ 343 Literal["docker", "python", "yaml", "auto"], 344 Field( 345 description="Optionally override the execution method to use for the connector. " 346 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 347 default="auto", 348 ), 349 ], 350 manifest_path: Annotated[ 351 str | Path | None, 352 Field( 353 description="Path to a local YAML manifest file for declarative connectors.", 354 default=None, 355 ), 356 ], 357) -> dict[str, Any]: 358 """List all properties for a specific stream in a source connector.""" 359 source: Source = _get_mcp_source( 360 connector_name=source_connector_name, 361 override_execution_mode=override_execution_mode, 362 manifest_path=manifest_path, 363 ) 364 config_dict = resolve_config( 365 config=config, 366 config_file=config_file, 367 config_secret_name=config_secret_name, 368 config_spec_jsonschema=source.config_spec, 369 ) 370 source.set_config(config_dict) 371 return source.get_stream_json_schema(stream_name=stream_name)
List all properties for a specific stream in a source connector.
374@mcp_tool( 375 domain="local", 376 read_only=True, 377 extra_help_text=_CONFIG_HELP, 378) 379def read_source_stream_records( 380 source_connector_name: Annotated[ 381 str, 382 Field(description="The name of the source connector."), 383 ], 384 config: Annotated[ 385 dict | str | None, 386 Field( 387 description="The configuration for the source connector as a dict or JSON string.", 388 default=None, 389 ), 390 ], 391 config_file: Annotated[ 392 str | Path | None, 393 Field( 394 description="Path to a YAML or JSON file containing the source connector config.", 395 default=None, 396 ), 397 ], 398 config_secret_name: Annotated[ 399 str | None, 400 Field( 401 description="The name of the secret containing the configuration.", 402 default=None, 403 ), 404 ], 405 *, 406 stream_name: Annotated[ 407 str, 408 Field(description="The name of the stream to read records from."), 409 ], 410 max_records: Annotated[ 411 int, 412 Field( 413 description="The maximum number of records to read.", 414 default=1000, 415 ), 416 ], 417 override_execution_mode: Annotated[ 418 Literal["docker", "python", "yaml", "auto"], 419 Field( 420 description="Optionally override the execution method to use for the connector. " 421 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 422 default="auto", 423 ), 424 ], 425 manifest_path: Annotated[ 426 str | Path | None, 427 Field( 428 description="Path to a local YAML manifest file for declarative connectors.", 429 default=None, 430 ), 431 ], 432) -> list[dict[str, Any]] | str: 433 """Get records from a source connector.""" 434 try: 435 source: Source = _get_mcp_source( 436 connector_name=source_connector_name, 437 override_execution_mode=override_execution_mode, 438 manifest_path=manifest_path, 439 ) 440 config_dict = resolve_config( 441 config=config, 442 config_file=config_file, 443 config_secret_name=config_secret_name, 444 config_spec_jsonschema=source.config_spec, 445 ) 446 source.set_config(config_dict) 447 # First we get a generator for the records in the specified stream. 448 record_generator = source.get_records(stream_name) 449 # Next we load a limited number of records from the generator into our list. 450 records: list[dict[str, Any]] = list(islice(record_generator, max_records)) 451 452 print(f"Retrieved {len(records)} records from stream '{stream_name}'", sys.stderr) 453 454 except Exception as ex: 455 tb_str = traceback.format_exc() 456 # If any error occurs, we print the error message to stderr and return an empty list. 457 return ( 458 f"Error reading records from source '{source_connector_name}': {ex!r}, {ex!s}\n{tb_str}" 459 ) 460 461 else: 462 return records
Get records from a source connector.
465@mcp_tool( 466 domain="local", 467 read_only=True, 468 extra_help_text=_CONFIG_HELP, 469) 470def get_stream_previews( 471 source_name: Annotated[ 472 str, 473 Field(description="The name of the source connector."), 474 ], 475 config: Annotated[ 476 dict | str | None, 477 Field( 478 description="The configuration for the source connector as a dict or JSON string.", 479 default=None, 480 ), 481 ], 482 config_file: Annotated[ 483 str | Path | None, 484 Field( 485 description="Path to a YAML or JSON file containing the source connector config.", 486 default=None, 487 ), 488 ], 489 config_secret_name: Annotated[ 490 str | None, 491 Field( 492 description="The name of the secret containing the configuration.", 493 default=None, 494 ), 495 ], 496 streams: Annotated[ 497 list[str] | str | None, 498 Field( 499 description=( 500 "The streams to get previews for. " 501 "Use '*' for all streams, or None for selected streams." 502 ), 503 default=None, 504 ), 505 ], 506 limit: Annotated[ 507 int, 508 Field( 509 description="The maximum number of sample records to return per stream.", 510 default=10, 511 ), 512 ], 513 override_execution_mode: Annotated[ 514 Literal["docker", "python", "yaml", "auto"], 515 Field( 516 description="Optionally override the execution method to use for the connector. " 517 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 518 default="auto", 519 ), 520 ], 521 manifest_path: Annotated[ 522 str | Path | None, 523 Field( 524 description="Path to a local YAML manifest file for declarative connectors.", 525 default=None, 526 ), 527 ], 528) -> dict[str, list[dict[str, Any]] | str]: 529 """Get sample records (previews) from streams in a source connector. 530 531 This operation requires a valid configuration, including any required secrets. 532 Returns a dictionary mapping stream names to lists of sample records, or an error 533 message string if an error occurred for that stream. 534 """ 535 source: Source = _get_mcp_source( 536 connector_name=source_name, 537 override_execution_mode=override_execution_mode, 538 manifest_path=manifest_path, 539 ) 540 541 config_dict = resolve_config( 542 config=config, 543 config_file=config_file, 544 config_secret_name=config_secret_name, 545 config_spec_jsonschema=source.config_spec, 546 ) 547 source.set_config(config_dict) 548 549 streams_param: list[str] | Literal["*"] | None = resolve_list_of_strings( 550 streams 551 ) # pyrefly: ignore[no-matching-overload] 552 if streams_param and len(streams_param) == 1 and streams_param[0] == "*": 553 streams_param = "*" 554 555 try: 556 samples_result = source.get_samples( 557 streams=streams_param, 558 limit=limit, 559 on_error="ignore", 560 ) 561 except Exception as ex: 562 tb_str = traceback.format_exc() 563 return { 564 "ERROR": f"Error getting stream previews from source '{source_name}': " 565 f"{ex!r}, {ex!s}\n{tb_str}" 566 } 567 568 result: dict[str, list[dict[str, Any]] | str] = {} 569 for stream_name, dataset in samples_result.items(): 570 if dataset is None: 571 result[stream_name] = f"Could not retrieve stream samples for stream '{stream_name}'" 572 else: 573 result[stream_name] = list(dataset) 574 575 return result
Get sample records (previews) from streams in a source connector.
This operation requires a valid configuration, including any required secrets. Returns a dictionary mapping stream names to lists of sample records, or an error message string if an error occurred for that stream.
578@mcp_tool( 579 domain="local", 580 destructive=False, 581 extra_help_text=_CONFIG_HELP, 582) 583def sync_source_to_cache( 584 source_connector_name: Annotated[ 585 str, 586 Field(description="The name of the source connector."), 587 ], 588 config: Annotated[ 589 dict | str | None, 590 Field( 591 description="The configuration for the source connector as a dict or JSON string.", 592 default=None, 593 ), 594 ], 595 config_file: Annotated[ 596 str | Path | None, 597 Field( 598 description="Path to a YAML or JSON file containing the source connector config.", 599 default=None, 600 ), 601 ], 602 config_secret_name: Annotated[ 603 str | None, 604 Field( 605 description="The name of the secret containing the configuration.", 606 default=None, 607 ), 608 ], 609 streams: Annotated[ 610 list[str] | str, 611 Field( 612 description="The streams to sync.", 613 default="suggested", 614 ), 615 ], 616 override_execution_mode: Annotated[ 617 Literal["docker", "python", "yaml", "auto"], 618 Field( 619 description="Optionally override the execution method to use for the connector. " 620 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 621 default="auto", 622 ), 623 ], 624 manifest_path: Annotated[ 625 str | Path | None, 626 Field( 627 description="Path to a local YAML manifest file for declarative connectors.", 628 default=None, 629 ), 630 ], 631) -> str: 632 """Run a sync from a source connector to the default DuckDB cache.""" 633 source: Source = _get_mcp_source( 634 connector_name=source_connector_name, 635 override_execution_mode=override_execution_mode, 636 manifest_path=manifest_path, 637 ) 638 config_dict = resolve_config( 639 config=config, 640 config_file=config_file, 641 config_secret_name=config_secret_name, 642 config_spec_jsonschema=source.config_spec, 643 ) 644 source.set_config(config_dict) 645 cache = get_default_cache() 646 647 streams = resolve_list_of_strings(streams) 648 if streams and len(streams) == 1 and streams[0] in {"*", "suggested"}: 649 # Float '*' and 'suggested' to the top-level for special processing: 650 streams = streams[0] 651 652 if isinstance(streams, str) and streams == "suggested": 653 streams = "*" # Default to all streams if 'suggested' is not otherwise specified. 654 try: 655 metadata = get_connector_metadata( 656 source_connector_name, 657 ) 658 except Exception: 659 streams = "*" # Fallback to all streams if suggested streams fail. 660 else: 661 if metadata is not None: 662 streams = metadata.suggested_streams or "*" 663 664 if isinstance(streams, str) and streams != "*": 665 streams = [streams] # Ensure streams is a list 666 667 source.read( 668 cache=cache, 669 streams=streams, 670 ) 671 del cache # Ensure the cache is closed properly 672 673 summary: str = f"Sync completed for '{source_connector_name}'!\n\n" 674 summary += "Data written to default DuckDB cache\n" 675 return summary
Run a sync from a source connector to the default DuckDB cache.
678class CachedDatasetInfo(BaseModel): 679 """Class to hold information about a cached dataset.""" 680 681 stream_name: str 682 """The name of the stream in the cache.""" 683 table_name: str 684 schema_name: str | None = None
Class to hold information about a cached dataset.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
687@mcp_tool( 688 domain="local", 689 read_only=True, 690 idempotent=True, 691 extra_help_text=_CONFIG_HELP, 692) 693def list_cached_streams() -> list[CachedDatasetInfo]: 694 """List all streams available in the default DuckDB cache.""" 695 cache: DuckDBCache = get_default_cache() 696 result = [ 697 CachedDatasetInfo( 698 stream_name=stream_name, 699 table_name=(cache.table_prefix or "") + stream_name, 700 schema_name=cache.schema_name, 701 ) 702 for stream_name in cache.streams 703 ] 704 del cache # Ensure the cache is closed properly 705 return result
List all streams available in the default DuckDB cache.
708@mcp_tool( 709 domain="local", 710 read_only=True, 711 idempotent=True, 712 extra_help_text=_CONFIG_HELP, 713) 714def describe_default_cache() -> dict[str, Any]: 715 """Describe the currently configured default cache.""" 716 cache = get_default_cache() 717 return { 718 "cache_type": type(cache).__name__, 719 "cache_dir": str(cache.cache_dir), 720 "cache_db_path": str(Path(cache.db_path).absolute()), 721 "cached_streams": list(cache.streams.keys()), 722 }
Describe the currently configured default cache.
760@mcp_tool( 761 domain="local", 762 read_only=True, 763 idempotent=True, 764 extra_help_text=_CONFIG_HELP, 765) 766def run_sql_query( 767 sql_query: Annotated[ 768 str, 769 Field(description="The SQL query to execute."), 770 ], 771 max_records: Annotated[ 772 int, 773 Field( 774 description="Maximum number of records to return.", 775 default=1000, 776 ), 777 ], 778) -> list[dict[str, Any]]: 779 """Run a SQL query against the default cache. 780 781 The dialect of SQL should match the dialect of the default cache. 782 Use `describe_default_cache` to see the cache type. 783 784 For DuckDB-type caches: 785 - Use `SHOW TABLES` to list all tables. 786 - Use `DESCRIBE <table_name>` to get the schema of a specific table 787 788 For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN. 789 """ 790 # Check if the query is safe to execute 791 if not _is_safe_sql(sql_query): 792 return [ 793 { 794 "ERROR": "Unsafe SQL query detected. Only read-only operations are allowed: " 795 "SELECT, DESCRIBE, SHOW, EXPLAIN", 796 "SQL_QUERY": sql_query, 797 } 798 ] 799 800 cache: DuckDBCache = get_default_cache() 801 try: 802 return cache.run_sql_query( 803 sql_query, 804 max_records=max_records, 805 ) 806 except Exception as ex: 807 tb_str = traceback.format_exc() 808 return [ 809 { 810 "ERROR": f"Error running SQL query: {ex!r}, {ex!s}", 811 "TRACEBACK": tb_str, 812 "SQL_QUERY": sql_query, 813 } 814 ] 815 finally: 816 del cache # Ensure the cache is closed properly
Run a SQL query against the default cache.
The dialect of SQL should match the dialect of the default cache.
Use describe_default_cache to see the cache type.
For DuckDB-type caches:
- Use
SHOW TABLESto list all tables. - Use
DESCRIBE <table_name>to get the schema of a specific table
For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN.