airbyte.mcp.local

Local MCP operations.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Local MCP operations."""
  3
  4import sys
  5import traceback
  6from itertools import islice
  7from pathlib import Path
  8from typing import TYPE_CHECKING, Annotated, Any, Literal
  9
 10from fastmcp import FastMCP
 11from fastmcp_extensions import mcp_tool, register_mcp_tools
 12from pydantic import BaseModel, Field
 13
 14from airbyte import get_source
 15from airbyte._util.meta import is_docker_installed
 16from airbyte.caches.util import get_default_cache
 17from airbyte.mcp._arg_resolvers import resolve_connector_config, resolve_list_of_strings
 18from airbyte.registry import get_connector_metadata
 19from airbyte.secrets.config import _get_secret_sources
 20from airbyte.secrets.env_vars import DotenvSecretManager
 21from airbyte.secrets.google_gsm import GoogleGSMSecretManager
 22from airbyte.sources.base import Source
 23
 24
 25if TYPE_CHECKING:
 26    from airbyte.caches.duckdb import DuckDBCache
 27
 28
 29_CONFIG_HELP = """
 30You can provide `config` as JSON or a Path to a YAML/JSON file.
 31If a `dict` is provided, it must not contain hardcoded secrets.
 32Instead, secrets should be provided using environment variables,
 33and the config should reference them using the format
 34`secret_reference::ENV_VAR_NAME`.
 35
 36You can also provide a `config_secret_name` to use a specific
 37secret name for the configuration. This is useful if you want to
 38validate a configuration that is stored in a secrets manager.
 39
 40If `config_secret_name` is provided, it should point to a string
 41that contains valid JSON or YAML.
 42
 43If both `config` and `config_secret_name` are provided, the
 44`config` will be loaded first and then the referenced secret config
 45will be layered on top of the non-secret config.
 46
 47For declarative connectors, you can provide a `manifest_path` to
 48specify a local YAML manifest file instead of using the registry
 49version. This is useful for testing custom or locally-developed
 50connector manifests.
 51"""
 52
 53
 54def _get_mcp_source(
 55    connector_name: str,
 56    override_execution_mode: Literal["auto", "docker", "python", "yaml"] = "auto",
 57    *,
 58    install_if_missing: bool = True,
 59    manifest_path: str | Path | None,
 60) -> Source:
 61    """Get the MCP source for a connector."""
 62    if manifest_path:
 63        override_execution_mode = "yaml"
 64    elif override_execution_mode == "auto" and is_docker_installed():
 65        override_execution_mode = "docker"
 66
 67    source: Source
 68    if override_execution_mode == "auto":
 69        # Use defaults with no overrides
 70        source = get_source(
 71            connector_name,
 72            install_if_missing=False,
 73            source_manifest=manifest_path or None,
 74        )
 75    elif override_execution_mode == "python":
 76        source = get_source(
 77            connector_name,
 78            use_python=True,
 79            install_if_missing=False,
 80            source_manifest=manifest_path or None,
 81        )
 82    elif override_execution_mode == "docker":
 83        source = get_source(
 84            connector_name,
 85            docker_image=True,
 86            install_if_missing=False,
 87            source_manifest=manifest_path or None,
 88        )
 89    elif override_execution_mode == "yaml":
 90        source = get_source(
 91            connector_name,
 92            source_manifest=manifest_path or True,
 93            install_if_missing=False,
 94        )
 95    else:
 96        raise ValueError(
 97            f"Unknown execution method: {override_execution_mode}. "
 98            "Expected one of: ['auto', 'docker', 'python', 'yaml']."
 99        )
100
101    # Ensure installed:
102    if install_if_missing:
103        source.executor.ensure_installation()
104
105    return source
106
107
108@mcp_tool(
109    read_only=True,
110    idempotent=True,
111    extra_help_text=_CONFIG_HELP,
112)
113def validate_connector_config(
114    connector_name: Annotated[
115        str,
116        Field(description="The name of the connector to validate."),
117    ],
118    config: Annotated[
119        dict | str | None,
120        Field(
121            description="The configuration for the connector as a dict object or JSON string.",
122            default=None,
123        ),
124    ],
125    config_file: Annotated[
126        str | Path | None,
127        Field(
128            description="Path to a YAML or JSON file containing the connector configuration.",
129            default=None,
130        ),
131    ],
132    config_secret_name: Annotated[
133        str | None,
134        Field(
135            description="The name of the secret containing the configuration.",
136            default=None,
137        ),
138    ],
139    override_execution_mode: Annotated[
140        Literal["docker", "python", "yaml", "auto"],
141        Field(
142            description="Optionally override the execution method to use for the connector. "
143            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
144            default="auto",
145        ),
146    ],
147    manifest_path: Annotated[
148        str | Path | None,
149        Field(
150            description="Path to a local YAML manifest file for declarative connectors.",
151            default=None,
152        ),
153    ],
154) -> tuple[bool, str]:
155    """Validate a connector configuration.
156
157    Returns a tuple of (is_valid: bool, message: str).
158    """
159    try:
160        source: Source = _get_mcp_source(
161            connector_name,
162            override_execution_mode=override_execution_mode,
163            manifest_path=manifest_path,
164        )
165    except Exception as ex:
166        return False, f"Failed to get connector '{connector_name}': {ex}"
167
168    try:
169        config_dict = resolve_connector_config(
170            config=config,
171            config_file=config_file,
172            config_secret_name=config_secret_name,
173            config_spec_jsonschema=source.config_spec,
174        )
175        source.set_config(config_dict)
176    except Exception as ex:
177        return False, f"Failed to resolve configuration for {connector_name}: {ex}"
178
179    try:
180        source.check()
181    except Exception as ex:
182        return False, f"Configuration for {connector_name} is invalid: {ex}"
183
184    return True, f"Configuration for {connector_name} is valid!"
185
186
187@mcp_tool(
188    read_only=True,
189    idempotent=True,
190)
191def list_connector_config_secrets(
192    connector_name: Annotated[
193        str,
194        Field(description="The name of the connector."),
195    ],
196) -> list[str]:
197    """List all `config_secret_name` options that are known for the given connector.
198
199    This can be used to find out which already-created config secret names are available
200    for a given connector. The return value is a list of secret names, but it will not
201    return the actual secret values.
202    """
203    secrets_names: list[str] = []
204    for secrets_mgr in _get_secret_sources():
205        if isinstance(secrets_mgr, GoogleGSMSecretManager):
206            secrets_names.extend(
207                [
208                    secret_handle.secret_name.split("/")[-1]
209                    for secret_handle in secrets_mgr.fetch_connector_secrets(connector_name)
210                ]
211            )
212
213    return secrets_names
214
215
216@mcp_tool(
217    read_only=True,
218    idempotent=True,
219    extra_help_text=_CONFIG_HELP,
220)
221def list_dotenv_secrets() -> dict[str, list[str]]:
222    """List all environment variable names declared within declared .env files.
223
224    This returns a dictionary mapping the .env file name to a list of environment
225    variable names. The values of the environment variables are not returned.
226    """
227    result: dict[str, list[str]] = {}
228    for secrets_mgr in _get_secret_sources():
229        if isinstance(secrets_mgr, DotenvSecretManager) and secrets_mgr.dotenv_path:
230            result[str(secrets_mgr.dotenv_path.resolve())] = secrets_mgr.list_secrets_names()
231
232    return result
233
234
235@mcp_tool(
236    read_only=True,
237    idempotent=True,
238    extra_help_text=_CONFIG_HELP,
239)
240def list_source_streams(
241    source_connector_name: Annotated[
242        str,
243        Field(description="The name of the source connector."),
244    ],
245    config: Annotated[
246        dict | str | None,
247        Field(
248            description="The configuration for the source connector as a dict or JSON string.",
249            default=None,
250        ),
251    ],
252    config_file: Annotated[
253        str | Path | None,
254        Field(
255            description="Path to a YAML or JSON file containing the source connector config.",
256            default=None,
257        ),
258    ],
259    config_secret_name: Annotated[
260        str | None,
261        Field(
262            description="The name of the secret containing the configuration.",
263            default=None,
264        ),
265    ],
266    override_execution_mode: Annotated[
267        Literal["docker", "python", "yaml", "auto"],
268        Field(
269            description="Optionally override the execution method to use for the connector. "
270            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
271            default="auto",
272        ),
273    ],
274    manifest_path: Annotated[
275        str | Path | None,
276        Field(
277            description="Path to a local YAML manifest file for declarative connectors.",
278            default=None,
279        ),
280    ],
281) -> list[str]:
282    """List all streams available in a source connector.
283
284    This operation (generally) requires a valid configuration, including any required secrets.
285    """
286    source: Source = _get_mcp_source(
287        connector_name=source_connector_name,
288        override_execution_mode=override_execution_mode,
289        manifest_path=manifest_path,
290    )
291    config_dict = resolve_connector_config(
292        config=config,
293        config_file=config_file,
294        config_secret_name=config_secret_name,
295        config_spec_jsonschema=source.config_spec,
296    )
297    source.set_config(config_dict)
298    return source.get_available_streams()
299
300
301@mcp_tool(
302    read_only=True,
303    idempotent=True,
304    extra_help_text=_CONFIG_HELP,
305)
306def get_source_stream_json_schema(
307    source_connector_name: Annotated[
308        str,
309        Field(description="The name of the source connector."),
310    ],
311    stream_name: Annotated[
312        str,
313        Field(description="The name of the stream."),
314    ],
315    config: Annotated[
316        dict | str | None,
317        Field(
318            description="The configuration for the source connector as a dict or JSON string.",
319            default=None,
320        ),
321    ],
322    config_file: Annotated[
323        str | Path | None,
324        Field(
325            description="Path to a YAML or JSON file containing the source connector config.",
326            default=None,
327        ),
328    ],
329    config_secret_name: Annotated[
330        str | None,
331        Field(
332            description="The name of the secret containing the configuration.",
333            default=None,
334        ),
335    ],
336    override_execution_mode: Annotated[
337        Literal["docker", "python", "yaml", "auto"],
338        Field(
339            description="Optionally override the execution method to use for the connector. "
340            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
341            default="auto",
342        ),
343    ],
344    manifest_path: Annotated[
345        str | Path | None,
346        Field(
347            description="Path to a local YAML manifest file for declarative connectors.",
348            default=None,
349        ),
350    ],
351) -> dict[str, Any]:
352    """List all properties for a specific stream in a source connector."""
353    source: Source = _get_mcp_source(
354        connector_name=source_connector_name,
355        override_execution_mode=override_execution_mode,
356        manifest_path=manifest_path,
357    )
358    config_dict = resolve_connector_config(
359        config=config,
360        config_file=config_file,
361        config_secret_name=config_secret_name,
362        config_spec_jsonschema=source.config_spec,
363    )
364    source.set_config(config_dict)
365    return source.get_stream_json_schema(stream_name=stream_name)
366
367
368@mcp_tool(
369    read_only=True,
370    extra_help_text=_CONFIG_HELP,
371)
372def read_source_stream_records(
373    source_connector_name: Annotated[
374        str,
375        Field(description="The name of the source connector."),
376    ],
377    config: Annotated[
378        dict | str | None,
379        Field(
380            description="The configuration for the source connector as a dict or JSON string.",
381            default=None,
382        ),
383    ],
384    config_file: Annotated[
385        str | Path | None,
386        Field(
387            description="Path to a YAML or JSON file containing the source connector config.",
388            default=None,
389        ),
390    ],
391    config_secret_name: Annotated[
392        str | None,
393        Field(
394            description="The name of the secret containing the configuration.",
395            default=None,
396        ),
397    ],
398    *,
399    stream_name: Annotated[
400        str,
401        Field(description="The name of the stream to read records from."),
402    ],
403    max_records: Annotated[
404        int,
405        Field(
406            description="The maximum number of records to read.",
407            default=1000,
408        ),
409    ],
410    override_execution_mode: Annotated[
411        Literal["docker", "python", "yaml", "auto"],
412        Field(
413            description="Optionally override the execution method to use for the connector. "
414            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
415            default="auto",
416        ),
417    ],
418    manifest_path: Annotated[
419        str | Path | None,
420        Field(
421            description="Path to a local YAML manifest file for declarative connectors.",
422            default=None,
423        ),
424    ],
425) -> list[dict[str, Any]] | str:
426    """Get records from a source connector."""
427    try:
428        source: Source = _get_mcp_source(
429            connector_name=source_connector_name,
430            override_execution_mode=override_execution_mode,
431            manifest_path=manifest_path,
432        )
433        config_dict = resolve_connector_config(
434            config=config,
435            config_file=config_file,
436            config_secret_name=config_secret_name,
437            config_spec_jsonschema=source.config_spec,
438        )
439        source.set_config(config_dict)
440        # First we get a generator for the records in the specified stream.
441        record_generator = source.get_records(stream_name)
442        # Next we load a limited number of records from the generator into our list.
443        records: list[dict[str, Any]] = list(islice(record_generator, max_records))
444
445        print(f"Retrieved {len(records)} records from stream '{stream_name}'", sys.stderr)
446
447    except Exception as ex:
448        tb_str = traceback.format_exc()
449        # If any error occurs, we print the error message to stderr and return an empty list.
450        return (
451            f"Error reading records from source '{source_connector_name}': {ex!r}, {ex!s}\n{tb_str}"
452        )
453
454    else:
455        return records
456
457
458@mcp_tool(
459    read_only=True,
460    extra_help_text=_CONFIG_HELP,
461)
462def get_stream_previews(
463    source_name: Annotated[
464        str,
465        Field(description="The name of the source connector."),
466    ],
467    config: Annotated[
468        dict | str | None,
469        Field(
470            description="The configuration for the source connector as a dict or JSON string.",
471            default=None,
472        ),
473    ],
474    config_file: Annotated[
475        str | Path | None,
476        Field(
477            description="Path to a YAML or JSON file containing the source connector config.",
478            default=None,
479        ),
480    ],
481    config_secret_name: Annotated[
482        str | None,
483        Field(
484            description="The name of the secret containing the configuration.",
485            default=None,
486        ),
487    ],
488    streams: Annotated[
489        list[str] | str | None,
490        Field(
491            description=(
492                "The streams to get previews for. "
493                "Use '*' for all streams, or None for selected streams."
494            ),
495            default=None,
496        ),
497    ],
498    limit: Annotated[
499        int,
500        Field(
501            description="The maximum number of sample records to return per stream.",
502            default=10,
503        ),
504    ],
505    override_execution_mode: Annotated[
506        Literal["docker", "python", "yaml", "auto"],
507        Field(
508            description="Optionally override the execution method to use for the connector. "
509            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
510            default="auto",
511        ),
512    ],
513    manifest_path: Annotated[
514        str | Path | None,
515        Field(
516            description="Path to a local YAML manifest file for declarative connectors.",
517            default=None,
518        ),
519    ],
520) -> dict[str, list[dict[str, Any]] | str]:
521    """Get sample records (previews) from streams in a source connector.
522
523    This operation requires a valid configuration, including any required secrets.
524    Returns a dictionary mapping stream names to lists of sample records, or an error
525    message string if an error occurred for that stream.
526    """
527    source: Source = _get_mcp_source(
528        connector_name=source_name,
529        override_execution_mode=override_execution_mode,
530        manifest_path=manifest_path,
531    )
532
533    config_dict = resolve_connector_config(
534        config=config,
535        config_file=config_file,
536        config_secret_name=config_secret_name,
537        config_spec_jsonschema=source.config_spec,
538    )
539    source.set_config(config_dict)
540
541    streams_param: list[str] | Literal["*"] | None = resolve_list_of_strings(
542        streams
543    )  # pyrefly: ignore[no-matching-overload]
544    if streams_param and len(streams_param) == 1 and streams_param[0] == "*":
545        streams_param = "*"
546
547    try:
548        samples_result = source.get_samples(
549            streams=streams_param,
550            limit=limit,
551            on_error="ignore",
552        )
553    except Exception as ex:
554        tb_str = traceback.format_exc()
555        return {
556            "ERROR": f"Error getting stream previews from source '{source_name}': "
557            f"{ex!r}, {ex!s}\n{tb_str}"
558        }
559
560    result: dict[str, list[dict[str, Any]] | str] = {}
561    for stream_name, dataset in samples_result.items():
562        if dataset is None:
563            result[stream_name] = f"Could not retrieve stream samples for stream '{stream_name}'"
564        else:
565            result[stream_name] = list(dataset)
566
567    return result
568
569
570@mcp_tool(
571    destructive=False,
572    extra_help_text=_CONFIG_HELP,
573)
574def sync_source_to_cache(
575    source_connector_name: Annotated[
576        str,
577        Field(description="The name of the source connector."),
578    ],
579    config: Annotated[
580        dict | str | None,
581        Field(
582            description="The configuration for the source connector as a dict or JSON string.",
583            default=None,
584        ),
585    ],
586    config_file: Annotated[
587        str | Path | None,
588        Field(
589            description="Path to a YAML or JSON file containing the source connector config.",
590            default=None,
591        ),
592    ],
593    config_secret_name: Annotated[
594        str | None,
595        Field(
596            description="The name of the secret containing the configuration.",
597            default=None,
598        ),
599    ],
600    streams: Annotated[
601        list[str] | str,
602        Field(
603            description="The streams to sync.",
604            default="suggested",
605        ),
606    ],
607    override_execution_mode: Annotated[
608        Literal["docker", "python", "yaml", "auto"],
609        Field(
610            description="Optionally override the execution method to use for the connector. "
611            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
612            default="auto",
613        ),
614    ],
615    manifest_path: Annotated[
616        str | Path | None,
617        Field(
618            description="Path to a local YAML manifest file for declarative connectors.",
619            default=None,
620        ),
621    ],
622) -> str:
623    """Run a sync from a source connector to the default DuckDB cache."""
624    source: Source = _get_mcp_source(
625        connector_name=source_connector_name,
626        override_execution_mode=override_execution_mode,
627        manifest_path=manifest_path,
628    )
629    config_dict = resolve_connector_config(
630        config=config,
631        config_file=config_file,
632        config_secret_name=config_secret_name,
633        config_spec_jsonschema=source.config_spec,
634    )
635    source.set_config(config_dict)
636    cache = get_default_cache()
637
638    streams = resolve_list_of_strings(streams)
639    if streams and len(streams) == 1 and streams[0] in {"*", "suggested"}:
640        # Float '*' and 'suggested' to the top-level for special processing:
641        streams = streams[0]
642
643    if isinstance(streams, str) and streams == "suggested":
644        streams = "*"  # Default to all streams if 'suggested' is not otherwise specified.
645        try:
646            metadata = get_connector_metadata(
647                source_connector_name,
648            )
649        except Exception:
650            streams = "*"  # Fallback to all streams if suggested streams fail.
651        else:
652            if metadata is not None:
653                streams = metadata.suggested_streams or "*"
654
655    if isinstance(streams, str) and streams != "*":
656        streams = [streams]  # Ensure streams is a list
657
658    source.read(
659        cache=cache,
660        streams=streams,
661    )
662    del cache  # Ensure the cache is closed properly
663
664    summary: str = f"Sync completed for '{source_connector_name}'!\n\n"
665    summary += "Data written to default DuckDB cache\n"
666    return summary
667
668
669class CachedDatasetInfo(BaseModel):
670    """Class to hold information about a cached dataset."""
671
672    stream_name: str
673    """The name of the stream in the cache."""
674    table_name: str
675    schema_name: str | None = None
676
677
678@mcp_tool(
679    read_only=True,
680    idempotent=True,
681    extra_help_text=_CONFIG_HELP,
682)
683def list_cached_streams() -> list[CachedDatasetInfo]:
684    """List all streams available in the default DuckDB cache."""
685    cache: DuckDBCache = get_default_cache()
686    result = [
687        CachedDatasetInfo(
688            stream_name=stream_name,
689            table_name=(cache.table_prefix or "") + stream_name,
690            schema_name=cache.schema_name,
691        )
692        for stream_name in cache.streams
693    ]
694    del cache  # Ensure the cache is closed properly
695    return result
696
697
698@mcp_tool(
699    read_only=True,
700    idempotent=True,
701    extra_help_text=_CONFIG_HELP,
702)
703def describe_default_cache() -> dict[str, Any]:
704    """Describe the currently configured default cache."""
705    cache = get_default_cache()
706    return {
707        "cache_type": type(cache).__name__,
708        "cache_dir": str(cache.cache_dir),
709        "cache_db_path": str(Path(cache.db_path).absolute()),
710        "cached_streams": list(cache.streams.keys()),
711    }
712
713
714def _is_safe_sql(sql_query: str) -> bool:
715    """Check if a SQL query is safe to execute.
716
717    For security reasons, we only allow read-only operations like SELECT, DESCRIBE, and SHOW.
718    Multi-statement queries (containing semicolons) are also disallowed for security.
719
720    Note: SQLAlchemy will also validate downstream, but this is a first-pass check.
721
722    Args:
723        sql_query: The SQL query to check
724
725    Returns:
726        True if the query is safe to execute, False otherwise
727    """
728    # Remove leading/trailing whitespace and convert to uppercase for checking
729    normalized_query = sql_query.strip().upper()
730
731    # Disallow multi-statement queries (containing semicolons)
732    # Note: We check the original query to catch semicolons anywhere, including in comments
733    if ";" in sql_query:
734        return False
735
736    # List of allowed SQL statement prefixes (read-only operations)
737    allowed_prefixes = (
738        "SELECT",
739        "DESCRIBE",
740        "DESC",  # Short form of DESCRIBE
741        "SHOW",
742        "EXPLAIN",  # Also safe - shows query execution plan
743    )
744
745    # Check if the query starts with any allowed prefix
746    return any(normalized_query.startswith(prefix) for prefix in allowed_prefixes)
747
748
749@mcp_tool(
750    read_only=True,
751    idempotent=True,
752    extra_help_text=_CONFIG_HELP,
753)
754def run_sql_query(
755    sql_query: Annotated[
756        str,
757        Field(description="The SQL query to execute."),
758    ],
759    max_records: Annotated[
760        int,
761        Field(
762            description="Maximum number of records to return.",
763            default=1000,
764        ),
765    ],
766) -> list[dict[str, Any]]:
767    """Run a SQL query against the default cache.
768
769    The dialect of SQL should match the dialect of the default cache.
770    Use `describe_default_cache` to see the cache type.
771
772    For DuckDB-type caches:
773    - Use `SHOW TABLES` to list all tables.
774    - Use `DESCRIBE <table_name>` to get the schema of a specific table
775
776    For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN.
777    """
778    # Check if the query is safe to execute
779    if not _is_safe_sql(sql_query):
780        return [
781            {
782                "ERROR": "Unsafe SQL query detected. Only read-only operations are allowed: "
783                "SELECT, DESCRIBE, SHOW, EXPLAIN",
784                "SQL_QUERY": sql_query,
785            }
786        ]
787
788    cache: DuckDBCache = get_default_cache()
789    try:
790        return cache.run_sql_query(
791            sql_query,
792            max_records=max_records,
793        )
794    except Exception as ex:
795        tb_str = traceback.format_exc()
796        return [
797            {
798                "ERROR": f"Error running SQL query: {ex!r}, {ex!s}",
799                "TRACEBACK": tb_str,
800                "SQL_QUERY": sql_query,
801            }
802        ]
803    finally:
804        del cache  # Ensure the cache is closed properly
805
806
807def register_local_tools(app: FastMCP) -> None:
808    """Register local tools with the FastMCP app.
809
810    Args:
811        app: FastMCP application instance
812    """
813    register_mcp_tools(app, mcp_module=__name__)
@mcp_tool(read_only=True, idempotent=True, extra_help_text=_CONFIG_HELP)
def validate_connector_config( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the connector to validate.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the connector as a dict object or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the connector configuration.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> tuple[bool, str]:
109@mcp_tool(
110    read_only=True,
111    idempotent=True,
112    extra_help_text=_CONFIG_HELP,
113)
114def validate_connector_config(
115    connector_name: Annotated[
116        str,
117        Field(description="The name of the connector to validate."),
118    ],
119    config: Annotated[
120        dict | str | None,
121        Field(
122            description="The configuration for the connector as a dict object or JSON string.",
123            default=None,
124        ),
125    ],
126    config_file: Annotated[
127        str | Path | None,
128        Field(
129            description="Path to a YAML or JSON file containing the connector configuration.",
130            default=None,
131        ),
132    ],
133    config_secret_name: Annotated[
134        str | None,
135        Field(
136            description="The name of the secret containing the configuration.",
137            default=None,
138        ),
139    ],
140    override_execution_mode: Annotated[
141        Literal["docker", "python", "yaml", "auto"],
142        Field(
143            description="Optionally override the execution method to use for the connector. "
144            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
145            default="auto",
146        ),
147    ],
148    manifest_path: Annotated[
149        str | Path | None,
150        Field(
151            description="Path to a local YAML manifest file for declarative connectors.",
152            default=None,
153        ),
154    ],
155) -> tuple[bool, str]:
156    """Validate a connector configuration.
157
158    Returns a tuple of (is_valid: bool, message: str).
159    """
160    try:
161        source: Source = _get_mcp_source(
162            connector_name,
163            override_execution_mode=override_execution_mode,
164            manifest_path=manifest_path,
165        )
166    except Exception as ex:
167        return False, f"Failed to get connector '{connector_name}': {ex}"
168
169    try:
170        config_dict = resolve_connector_config(
171            config=config,
172            config_file=config_file,
173            config_secret_name=config_secret_name,
174            config_spec_jsonschema=source.config_spec,
175        )
176        source.set_config(config_dict)
177    except Exception as ex:
178        return False, f"Failed to resolve configuration for {connector_name}: {ex}"
179
180    try:
181        source.check()
182    except Exception as ex:
183        return False, f"Configuration for {connector_name} is invalid: {ex}"
184
185    return True, f"Configuration for {connector_name} is valid!"

Validate a connector configuration.

Returns a tuple of (is_valid: bool, message: str).

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

@mcp_tool(read_only=True, idempotent=True)
def list_connector_config_secrets( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the connector.')]) -> list[str]:
188@mcp_tool(
189    read_only=True,
190    idempotent=True,
191)
192def list_connector_config_secrets(
193    connector_name: Annotated[
194        str,
195        Field(description="The name of the connector."),
196    ],
197) -> list[str]:
198    """List all `config_secret_name` options that are known for the given connector.
199
200    This can be used to find out which already-created config secret names are available
201    for a given connector. The return value is a list of secret names, but it will not
202    return the actual secret values.
203    """
204    secrets_names: list[str] = []
205    for secrets_mgr in _get_secret_sources():
206        if isinstance(secrets_mgr, GoogleGSMSecretManager):
207            secrets_names.extend(
208                [
209                    secret_handle.secret_name.split("/")[-1]
210                    for secret_handle in secrets_mgr.fetch_connector_secrets(connector_name)
211                ]
212            )
213
214    return secrets_names

List all config_secret_name options that are known for the given connector.

This can be used to find out which already-created config secret names are available for a given connector. The return value is a list of secret names, but it will not return the actual secret values.

@mcp_tool(read_only=True, idempotent=True, extra_help_text=_CONFIG_HELP)
def list_dotenv_secrets() -> dict[str, list[str]]:
217@mcp_tool(
218    read_only=True,
219    idempotent=True,
220    extra_help_text=_CONFIG_HELP,
221)
222def list_dotenv_secrets() -> dict[str, list[str]]:
223    """List all environment variable names declared within declared .env files.
224
225    This returns a dictionary mapping the .env file name to a list of environment
226    variable names. The values of the environment variables are not returned.
227    """
228    result: dict[str, list[str]] = {}
229    for secrets_mgr in _get_secret_sources():
230        if isinstance(secrets_mgr, DotenvSecretManager) and secrets_mgr.dotenv_path:
231            result[str(secrets_mgr.dotenv_path.resolve())] = secrets_mgr.list_secrets_names()
232
233    return result

List all environment variable names declared within declared .env files.

This returns a dictionary mapping the .env file name to a list of environment
variable names. The values of the environment variables are not returned.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

@mcp_tool(read_only=True, idempotent=True, extra_help_text=_CONFIG_HELP)
def list_source_streams( source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the source connector.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector as a dict or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the source connector config.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> list[str]:
236@mcp_tool(
237    read_only=True,
238    idempotent=True,
239    extra_help_text=_CONFIG_HELP,
240)
241def list_source_streams(
242    source_connector_name: Annotated[
243        str,
244        Field(description="The name of the source connector."),
245    ],
246    config: Annotated[
247        dict | str | None,
248        Field(
249            description="The configuration for the source connector as a dict or JSON string.",
250            default=None,
251        ),
252    ],
253    config_file: Annotated[
254        str | Path | None,
255        Field(
256            description="Path to a YAML or JSON file containing the source connector config.",
257            default=None,
258        ),
259    ],
260    config_secret_name: Annotated[
261        str | None,
262        Field(
263            description="The name of the secret containing the configuration.",
264            default=None,
265        ),
266    ],
267    override_execution_mode: Annotated[
268        Literal["docker", "python", "yaml", "auto"],
269        Field(
270            description="Optionally override the execution method to use for the connector. "
271            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
272            default="auto",
273        ),
274    ],
275    manifest_path: Annotated[
276        str | Path | None,
277        Field(
278            description="Path to a local YAML manifest file for declarative connectors.",
279            default=None,
280        ),
281    ],
282) -> list[str]:
283    """List all streams available in a source connector.
284
285    This operation (generally) requires a valid configuration, including any required secrets.
286    """
287    source: Source = _get_mcp_source(
288        connector_name=source_connector_name,
289        override_execution_mode=override_execution_mode,
290        manifest_path=manifest_path,
291    )
292    config_dict = resolve_connector_config(
293        config=config,
294        config_file=config_file,
295        config_secret_name=config_secret_name,
296        config_spec_jsonschema=source.config_spec,
297    )
298    source.set_config(config_dict)
299    return source.get_available_streams()

List all streams available in a source connector.

This operation (generally) requires a valid configuration, including any required secrets.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

@mcp_tool(read_only=True, idempotent=True, extra_help_text=_CONFIG_HELP)
def get_source_stream_json_schema( source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the source connector.')], stream_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the stream.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector as a dict or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the source connector config.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> dict[str, typing.Any]:
302@mcp_tool(
303    read_only=True,
304    idempotent=True,
305    extra_help_text=_CONFIG_HELP,
306)
307def get_source_stream_json_schema(
308    source_connector_name: Annotated[
309        str,
310        Field(description="The name of the source connector."),
311    ],
312    stream_name: Annotated[
313        str,
314        Field(description="The name of the stream."),
315    ],
316    config: Annotated[
317        dict | str | None,
318        Field(
319            description="The configuration for the source connector as a dict or JSON string.",
320            default=None,
321        ),
322    ],
323    config_file: Annotated[
324        str | Path | None,
325        Field(
326            description="Path to a YAML or JSON file containing the source connector config.",
327            default=None,
328        ),
329    ],
330    config_secret_name: Annotated[
331        str | None,
332        Field(
333            description="The name of the secret containing the configuration.",
334            default=None,
335        ),
336    ],
337    override_execution_mode: Annotated[
338        Literal["docker", "python", "yaml", "auto"],
339        Field(
340            description="Optionally override the execution method to use for the connector. "
341            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
342            default="auto",
343        ),
344    ],
345    manifest_path: Annotated[
346        str | Path | None,
347        Field(
348            description="Path to a local YAML manifest file for declarative connectors.",
349            default=None,
350        ),
351    ],
352) -> dict[str, Any]:
353    """List all properties for a specific stream in a source connector."""
354    source: Source = _get_mcp_source(
355        connector_name=source_connector_name,
356        override_execution_mode=override_execution_mode,
357        manifest_path=manifest_path,
358    )
359    config_dict = resolve_connector_config(
360        config=config,
361        config_file=config_file,
362        config_secret_name=config_secret_name,
363        config_spec_jsonschema=source.config_spec,
364    )
365    source.set_config(config_dict)
366    return source.get_stream_json_schema(stream_name=stream_name)

List all properties for a specific stream in a source connector.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

@mcp_tool(read_only=True, extra_help_text=_CONFIG_HELP)
def read_source_stream_records( source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the source connector.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector as a dict or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the source connector config.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], *, stream_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the stream to read records from.')], max_records: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=1000, description='The maximum number of records to read.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> list[dict[str, typing.Any]] | str:
369@mcp_tool(
370    read_only=True,
371    extra_help_text=_CONFIG_HELP,
372)
373def read_source_stream_records(
374    source_connector_name: Annotated[
375        str,
376        Field(description="The name of the source connector."),
377    ],
378    config: Annotated[
379        dict | str | None,
380        Field(
381            description="The configuration for the source connector as a dict or JSON string.",
382            default=None,
383        ),
384    ],
385    config_file: Annotated[
386        str | Path | None,
387        Field(
388            description="Path to a YAML or JSON file containing the source connector config.",
389            default=None,
390        ),
391    ],
392    config_secret_name: Annotated[
393        str | None,
394        Field(
395            description="The name of the secret containing the configuration.",
396            default=None,
397        ),
398    ],
399    *,
400    stream_name: Annotated[
401        str,
402        Field(description="The name of the stream to read records from."),
403    ],
404    max_records: Annotated[
405        int,
406        Field(
407            description="The maximum number of records to read.",
408            default=1000,
409        ),
410    ],
411    override_execution_mode: Annotated[
412        Literal["docker", "python", "yaml", "auto"],
413        Field(
414            description="Optionally override the execution method to use for the connector. "
415            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
416            default="auto",
417        ),
418    ],
419    manifest_path: Annotated[
420        str | Path | None,
421        Field(
422            description="Path to a local YAML manifest file for declarative connectors.",
423            default=None,
424        ),
425    ],
426) -> list[dict[str, Any]] | str:
427    """Get records from a source connector."""
428    try:
429        source: Source = _get_mcp_source(
430            connector_name=source_connector_name,
431            override_execution_mode=override_execution_mode,
432            manifest_path=manifest_path,
433        )
434        config_dict = resolve_connector_config(
435            config=config,
436            config_file=config_file,
437            config_secret_name=config_secret_name,
438            config_spec_jsonschema=source.config_spec,
439        )
440        source.set_config(config_dict)
441        # First we get a generator for the records in the specified stream.
442        record_generator = source.get_records(stream_name)
443        # Next we load a limited number of records from the generator into our list.
444        records: list[dict[str, Any]] = list(islice(record_generator, max_records))
445
446        print(f"Retrieved {len(records)} records from stream '{stream_name}'", sys.stderr)
447
448    except Exception as ex:
449        tb_str = traceback.format_exc()
450        # If any error occurs, we print the error message to stderr and return an empty list.
451        return (
452            f"Error reading records from source '{source_connector_name}': {ex!r}, {ex!s}\n{tb_str}"
453        )
454
455    else:
456        return records

Get records from a source connector.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

@mcp_tool(read_only=True, extra_help_text=_CONFIG_HELP)
def get_stream_previews( source_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the source connector.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector as a dict or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the source connector config.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], streams: typing.Annotated[list[str] | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description="The streams to get previews for. Use '*' for all streams, or None for selected streams.")], limit: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=10, description='The maximum number of sample records to return per stream.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> dict[str, list[dict[str, typing.Any]] | str]:
459@mcp_tool(
460    read_only=True,
461    extra_help_text=_CONFIG_HELP,
462)
463def get_stream_previews(
464    source_name: Annotated[
465        str,
466        Field(description="The name of the source connector."),
467    ],
468    config: Annotated[
469        dict | str | None,
470        Field(
471            description="The configuration for the source connector as a dict or JSON string.",
472            default=None,
473        ),
474    ],
475    config_file: Annotated[
476        str | Path | None,
477        Field(
478            description="Path to a YAML or JSON file containing the source connector config.",
479            default=None,
480        ),
481    ],
482    config_secret_name: Annotated[
483        str | None,
484        Field(
485            description="The name of the secret containing the configuration.",
486            default=None,
487        ),
488    ],
489    streams: Annotated[
490        list[str] | str | None,
491        Field(
492            description=(
493                "The streams to get previews for. "
494                "Use '*' for all streams, or None for selected streams."
495            ),
496            default=None,
497        ),
498    ],
499    limit: Annotated[
500        int,
501        Field(
502            description="The maximum number of sample records to return per stream.",
503            default=10,
504        ),
505    ],
506    override_execution_mode: Annotated[
507        Literal["docker", "python", "yaml", "auto"],
508        Field(
509            description="Optionally override the execution method to use for the connector. "
510            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
511            default="auto",
512        ),
513    ],
514    manifest_path: Annotated[
515        str | Path | None,
516        Field(
517            description="Path to a local YAML manifest file for declarative connectors.",
518            default=None,
519        ),
520    ],
521) -> dict[str, list[dict[str, Any]] | str]:
522    """Get sample records (previews) from streams in a source connector.
523
524    This operation requires a valid configuration, including any required secrets.
525    Returns a dictionary mapping stream names to lists of sample records, or an error
526    message string if an error occurred for that stream.
527    """
528    source: Source = _get_mcp_source(
529        connector_name=source_name,
530        override_execution_mode=override_execution_mode,
531        manifest_path=manifest_path,
532    )
533
534    config_dict = resolve_connector_config(
535        config=config,
536        config_file=config_file,
537        config_secret_name=config_secret_name,
538        config_spec_jsonschema=source.config_spec,
539    )
540    source.set_config(config_dict)
541
542    streams_param: list[str] | Literal["*"] | None = resolve_list_of_strings(
543        streams
544    )  # pyrefly: ignore[no-matching-overload]
545    if streams_param and len(streams_param) == 1 and streams_param[0] == "*":
546        streams_param = "*"
547
548    try:
549        samples_result = source.get_samples(
550            streams=streams_param,
551            limit=limit,
552            on_error="ignore",
553        )
554    except Exception as ex:
555        tb_str = traceback.format_exc()
556        return {
557            "ERROR": f"Error getting stream previews from source '{source_name}': "
558            f"{ex!r}, {ex!s}\n{tb_str}"
559        }
560
561    result: dict[str, list[dict[str, Any]] | str] = {}
562    for stream_name, dataset in samples_result.items():
563        if dataset is None:
564            result[stream_name] = f"Could not retrieve stream samples for stream '{stream_name}'"
565        else:
566            result[stream_name] = list(dataset)
567
568    return result

Get sample records (previews) from streams in a source connector.

This operation requires a valid configuration, including any required secrets.
Returns a dictionary mapping stream names to lists of sample records, or an error
message string if an error occurred for that stream.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

@mcp_tool(destructive=False, extra_help_text=_CONFIG_HELP)
def sync_source_to_cache( source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the source connector.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector as a dict or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the source connector config.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], streams: typing.Annotated[list[str] | str, FieldInfo(annotation=NoneType, required=False, default='suggested', description='The streams to sync.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> str:
571@mcp_tool(
572    destructive=False,
573    extra_help_text=_CONFIG_HELP,
574)
575def sync_source_to_cache(
576    source_connector_name: Annotated[
577        str,
578        Field(description="The name of the source connector."),
579    ],
580    config: Annotated[
581        dict | str | None,
582        Field(
583            description="The configuration for the source connector as a dict or JSON string.",
584            default=None,
585        ),
586    ],
587    config_file: Annotated[
588        str | Path | None,
589        Field(
590            description="Path to a YAML or JSON file containing the source connector config.",
591            default=None,
592        ),
593    ],
594    config_secret_name: Annotated[
595        str | None,
596        Field(
597            description="The name of the secret containing the configuration.",
598            default=None,
599        ),
600    ],
601    streams: Annotated[
602        list[str] | str,
603        Field(
604            description="The streams to sync.",
605            default="suggested",
606        ),
607    ],
608    override_execution_mode: Annotated[
609        Literal["docker", "python", "yaml", "auto"],
610        Field(
611            description="Optionally override the execution method to use for the connector. "
612            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
613            default="auto",
614        ),
615    ],
616    manifest_path: Annotated[
617        str | Path | None,
618        Field(
619            description="Path to a local YAML manifest file for declarative connectors.",
620            default=None,
621        ),
622    ],
623) -> str:
624    """Run a sync from a source connector to the default DuckDB cache."""
625    source: Source = _get_mcp_source(
626        connector_name=source_connector_name,
627        override_execution_mode=override_execution_mode,
628        manifest_path=manifest_path,
629    )
630    config_dict = resolve_connector_config(
631        config=config,
632        config_file=config_file,
633        config_secret_name=config_secret_name,
634        config_spec_jsonschema=source.config_spec,
635    )
636    source.set_config(config_dict)
637    cache = get_default_cache()
638
639    streams = resolve_list_of_strings(streams)
640    if streams and len(streams) == 1 and streams[0] in {"*", "suggested"}:
641        # Float '*' and 'suggested' to the top-level for special processing:
642        streams = streams[0]
643
644    if isinstance(streams, str) and streams == "suggested":
645        streams = "*"  # Default to all streams if 'suggested' is not otherwise specified.
646        try:
647            metadata = get_connector_metadata(
648                source_connector_name,
649            )
650        except Exception:
651            streams = "*"  # Fallback to all streams if suggested streams fail.
652        else:
653            if metadata is not None:
654                streams = metadata.suggested_streams or "*"
655
656    if isinstance(streams, str) and streams != "*":
657        streams = [streams]  # Ensure streams is a list
658
659    source.read(
660        cache=cache,
661        streams=streams,
662    )
663    del cache  # Ensure the cache is closed properly
664
665    summary: str = f"Sync completed for '{source_connector_name}'!\n\n"
666    summary += "Data written to default DuckDB cache\n"
667    return summary

Run a sync from a source connector to the default DuckDB cache.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

class CachedDatasetInfo(pydantic.main.BaseModel):
670class CachedDatasetInfo(BaseModel):
671    """Class to hold information about a cached dataset."""
672
673    stream_name: str
674    """The name of the stream in the cache."""
675    table_name: str
676    schema_name: str | None = None

Class to hold information about a cached dataset.

stream_name: str = PydanticUndefined

The name of the stream in the cache.

table_name: str = PydanticUndefined
schema_name: str | None = None
@mcp_tool(read_only=True, idempotent=True, extra_help_text=_CONFIG_HELP)
def list_cached_streams() -> list[CachedDatasetInfo]:
679@mcp_tool(
680    read_only=True,
681    idempotent=True,
682    extra_help_text=_CONFIG_HELP,
683)
684def list_cached_streams() -> list[CachedDatasetInfo]:
685    """List all streams available in the default DuckDB cache."""
686    cache: DuckDBCache = get_default_cache()
687    result = [
688        CachedDatasetInfo(
689            stream_name=stream_name,
690            table_name=(cache.table_prefix or "") + stream_name,
691            schema_name=cache.schema_name,
692        )
693        for stream_name in cache.streams
694    ]
695    del cache  # Ensure the cache is closed properly
696    return result

List all streams available in the default DuckDB cache.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

@mcp_tool(read_only=True, idempotent=True, extra_help_text=_CONFIG_HELP)
def describe_default_cache() -> dict[str, typing.Any]:
699@mcp_tool(
700    read_only=True,
701    idempotent=True,
702    extra_help_text=_CONFIG_HELP,
703)
704def describe_default_cache() -> dict[str, Any]:
705    """Describe the currently configured default cache."""
706    cache = get_default_cache()
707    return {
708        "cache_type": type(cache).__name__,
709        "cache_dir": str(cache.cache_dir),
710        "cache_db_path": str(Path(cache.db_path).absolute()),
711        "cached_streams": list(cache.streams.keys()),
712    }

Describe the currently configured default cache.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

@mcp_tool(read_only=True, idempotent=True, extra_help_text=_CONFIG_HELP)
def run_sql_query( sql_query: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The SQL query to execute.')], max_records: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=1000, description='Maximum number of records to return.')]) -> list[dict[str, typing.Any]]:
750@mcp_tool(
751    read_only=True,
752    idempotent=True,
753    extra_help_text=_CONFIG_HELP,
754)
755def run_sql_query(
756    sql_query: Annotated[
757        str,
758        Field(description="The SQL query to execute."),
759    ],
760    max_records: Annotated[
761        int,
762        Field(
763            description="Maximum number of records to return.",
764            default=1000,
765        ),
766    ],
767) -> list[dict[str, Any]]:
768    """Run a SQL query against the default cache.
769
770    The dialect of SQL should match the dialect of the default cache.
771    Use `describe_default_cache` to see the cache type.
772
773    For DuckDB-type caches:
774    - Use `SHOW TABLES` to list all tables.
775    - Use `DESCRIBE <table_name>` to get the schema of a specific table
776
777    For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN.
778    """
779    # Check if the query is safe to execute
780    if not _is_safe_sql(sql_query):
781        return [
782            {
783                "ERROR": "Unsafe SQL query detected. Only read-only operations are allowed: "
784                "SELECT, DESCRIBE, SHOW, EXPLAIN",
785                "SQL_QUERY": sql_query,
786            }
787        ]
788
789    cache: DuckDBCache = get_default_cache()
790    try:
791        return cache.run_sql_query(
792            sql_query,
793            max_records=max_records,
794        )
795    except Exception as ex:
796        tb_str = traceback.format_exc()
797        return [
798            {
799                "ERROR": f"Error running SQL query: {ex!r}, {ex!s}",
800                "TRACEBACK": tb_str,
801                "SQL_QUERY": sql_query,
802            }
803        ]
804    finally:
805        del cache  # Ensure the cache is closed properly

Run a SQL query against the default cache.

The dialect of SQL should match the dialect of the default cache.
Use `describe_default_cache` to see the cache type.

For DuckDB-type caches:
- Use `SHOW TABLES` to list all tables.
- Use `DESCRIBE <table_name>` to get the schema of a specific table

For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

def register_local_tools(app: fastmcp.server.server.FastMCP) -> None:
808def register_local_tools(app: FastMCP) -> None:
809    """Register local tools with the FastMCP app.
810
811    Args:
812        app: FastMCP application instance
813    """
814    register_mcp_tools(app, mcp_module=__name__)

Register local tools with the FastMCP app.

Arguments:
  • app: FastMCP application instance