airbyte.mcp.local_ops

Local MCP operations.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Local MCP operations."""
  3
  4import sys
  5import traceback
  6from itertools import islice
  7from pathlib import Path
  8from typing import TYPE_CHECKING, Annotated, Any, Literal
  9
 10from fastmcp import FastMCP
 11from pydantic import BaseModel, Field
 12
 13from airbyte import get_source
 14from airbyte._util.meta import is_docker_installed
 15from airbyte.caches.util import get_default_cache
 16from airbyte.mcp._tool_utils import mcp_tool, register_tools
 17from airbyte.mcp._util import resolve_config, resolve_list_of_strings
 18from airbyte.secrets.config import _get_secret_sources
 19from airbyte.secrets.env_vars import DotenvSecretManager
 20from airbyte.secrets.google_gsm import GoogleGSMSecretManager
 21from airbyte.sources.base import Source
 22from airbyte.sources.registry import get_connector_metadata
 23
 24
 25if TYPE_CHECKING:
 26    from airbyte.caches.duckdb import DuckDBCache
 27
 28
 29_CONFIG_HELP = """
 30You can provide `config` as JSON or a Path to a YAML/JSON file.
 31If a `dict` is provided, it must not contain hardcoded secrets.
 32Instead, secrets should be provided using environment variables,
 33and the config should reference them using the format
 34`secret_reference::ENV_VAR_NAME`.
 35
 36You can also provide a `config_secret_name` to use a specific
 37secret name for the configuration. This is useful if you want to
 38validate a configuration that is stored in a secrets manager.
 39
 40If `config_secret_name` is provided, it should point to a string
 41that contains valid JSON or YAML.
 42
 43If both `config` and `config_secret_name` are provided, the
 44`config` will be loaded first and then the referenced secret config
 45will be layered on top of the non-secret config.
 46
 47For declarative connectors, you can provide a `manifest_path` to
 48specify a local YAML manifest file instead of using the registry
 49version. This is useful for testing custom or locally-developed
 50connector manifests.
 51"""
 52
 53
 54def _get_mcp_source(
 55    connector_name: str,
 56    override_execution_mode: Literal["auto", "docker", "python", "yaml"] = "auto",
 57    *,
 58    install_if_missing: bool = True,
 59    manifest_path: str | Path | None,
 60) -> Source:
 61    """Get the MCP source for a connector."""
 62    if manifest_path:
 63        override_execution_mode = "yaml"
 64    elif override_execution_mode == "auto" and is_docker_installed():
 65        override_execution_mode = "docker"
 66
 67    source: Source
 68    if override_execution_mode == "auto":
 69        # Use defaults with no overrides
 70        source = get_source(
 71            connector_name,
 72            install_if_missing=False,
 73            source_manifest=manifest_path or None,
 74        )
 75    elif override_execution_mode == "python":
 76        source = get_source(
 77            connector_name,
 78            use_python=True,
 79            install_if_missing=False,
 80            source_manifest=manifest_path or None,
 81        )
 82    elif override_execution_mode == "docker":
 83        source = get_source(
 84            connector_name,
 85            docker_image=True,
 86            install_if_missing=False,
 87            source_manifest=manifest_path or None,
 88        )
 89    elif override_execution_mode == "yaml":
 90        source = get_source(
 91            connector_name,
 92            source_manifest=manifest_path or True,
 93            install_if_missing=False,
 94        )
 95    else:
 96        raise ValueError(
 97            f"Unknown execution method: {override_execution_mode}. "
 98            "Expected one of: ['auto', 'docker', 'python', 'yaml']."
 99        )
100
101    # Ensure installed:
102    if install_if_missing:
103        source.executor.ensure_installation()
104
105    return source
106
107
108@mcp_tool(
109    domain="local",
110    read_only=True,
111    idempotent=True,
112    extra_help_text=_CONFIG_HELP,
113)
114def validate_connector_config(
115    connector_name: Annotated[
116        str,
117        Field(description="The name of the connector to validate."),
118    ],
119    config: Annotated[
120        dict | str | None,
121        Field(
122            description="The configuration for the connector as a dict object or JSON string.",
123            default=None,
124        ),
125    ],
126    config_file: Annotated[
127        str | Path | None,
128        Field(
129            description="Path to a YAML or JSON file containing the connector configuration.",
130            default=None,
131        ),
132    ],
133    config_secret_name: Annotated[
134        str | None,
135        Field(
136            description="The name of the secret containing the configuration.",
137            default=None,
138        ),
139    ],
140    override_execution_mode: Annotated[
141        Literal["docker", "python", "yaml", "auto"],
142        Field(
143            description="Optionally override the execution method to use for the connector. "
144            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
145            default="auto",
146        ),
147    ],
148    manifest_path: Annotated[
149        str | Path | None,
150        Field(
151            description="Path to a local YAML manifest file for declarative connectors.",
152            default=None,
153        ),
154    ],
155) -> tuple[bool, str]:
156    """Validate a connector configuration.
157
158    Returns a tuple of (is_valid: bool, message: str).
159    """
160    try:
161        source: Source = _get_mcp_source(
162            connector_name,
163            override_execution_mode=override_execution_mode,
164            manifest_path=manifest_path,
165        )
166    except Exception as ex:
167        return False, f"Failed to get connector '{connector_name}': {ex}"
168
169    try:
170        config_dict = resolve_config(
171            config=config,
172            config_file=config_file,
173            config_secret_name=config_secret_name,
174            config_spec_jsonschema=source.config_spec,
175        )
176        source.set_config(config_dict)
177    except Exception as ex:
178        return False, f"Failed to resolve configuration for {connector_name}: {ex}"
179
180    try:
181        source.check()
182    except Exception as ex:
183        return False, f"Configuration for {connector_name} is invalid: {ex}"
184
185    return True, f"Configuration for {connector_name} is valid!"
186
187
188@mcp_tool(
189    domain="local",
190    read_only=True,
191    idempotent=True,
192)
193def list_connector_config_secrets(
194    connector_name: Annotated[
195        str,
196        Field(description="The name of the connector."),
197    ],
198) -> list[str]:
199    """List all `config_secret_name` options that are known for the given connector.
200
201    This can be used to find out which already-created config secret names are available
202    for a given connector. The return value is a list of secret names, but it will not
203    return the actual secret values.
204    """
205    secrets_names: list[str] = []
206    for secrets_mgr in _get_secret_sources():
207        if isinstance(secrets_mgr, GoogleGSMSecretManager):
208            secrets_names.extend(
209                [
210                    secret_handle.secret_name.split("/")[-1]
211                    for secret_handle in secrets_mgr.fetch_connector_secrets(connector_name)
212                ]
213            )
214
215    return secrets_names
216
217
218@mcp_tool(
219    domain="local",
220    read_only=True,
221    idempotent=True,
222    extra_help_text=_CONFIG_HELP,
223)
224def list_dotenv_secrets() -> dict[str, list[str]]:
225    """List all environment variable names declared within declared .env files.
226
227    This returns a dictionary mapping the .env file name to a list of environment
228    variable names. The values of the environment variables are not returned.
229    """
230    result: dict[str, list[str]] = {}
231    for secrets_mgr in _get_secret_sources():
232        if isinstance(secrets_mgr, DotenvSecretManager) and secrets_mgr.dotenv_path:
233            result[str(secrets_mgr.dotenv_path.resolve())] = secrets_mgr.list_secrets_names()
234
235    return result
236
237
238@mcp_tool(
239    domain="local",
240    read_only=True,
241    idempotent=True,
242    extra_help_text=_CONFIG_HELP,
243)
244def list_source_streams(
245    source_connector_name: Annotated[
246        str,
247        Field(description="The name of the source connector."),
248    ],
249    config: Annotated[
250        dict | str | None,
251        Field(
252            description="The configuration for the source connector as a dict or JSON string.",
253            default=None,
254        ),
255    ],
256    config_file: Annotated[
257        str | Path | None,
258        Field(
259            description="Path to a YAML or JSON file containing the source connector config.",
260            default=None,
261        ),
262    ],
263    config_secret_name: Annotated[
264        str | None,
265        Field(
266            description="The name of the secret containing the configuration.",
267            default=None,
268        ),
269    ],
270    override_execution_mode: Annotated[
271        Literal["docker", "python", "yaml", "auto"],
272        Field(
273            description="Optionally override the execution method to use for the connector. "
274            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
275            default="auto",
276        ),
277    ],
278    manifest_path: Annotated[
279        str | Path | None,
280        Field(
281            description="Path to a local YAML manifest file for declarative connectors.",
282            default=None,
283        ),
284    ],
285) -> list[str]:
286    """List all streams available in a source connector.
287
288    This operation (generally) requires a valid configuration, including any required secrets.
289    """
290    source: Source = _get_mcp_source(
291        connector_name=source_connector_name,
292        override_execution_mode=override_execution_mode,
293        manifest_path=manifest_path,
294    )
295    config_dict = resolve_config(
296        config=config,
297        config_file=config_file,
298        config_secret_name=config_secret_name,
299        config_spec_jsonschema=source.config_spec,
300    )
301    source.set_config(config_dict)
302    return source.get_available_streams()
303
304
305@mcp_tool(
306    domain="local",
307    read_only=True,
308    idempotent=True,
309    extra_help_text=_CONFIG_HELP,
310)
311def get_source_stream_json_schema(
312    source_connector_name: Annotated[
313        str,
314        Field(description="The name of the source connector."),
315    ],
316    stream_name: Annotated[
317        str,
318        Field(description="The name of the stream."),
319    ],
320    config: Annotated[
321        dict | str | None,
322        Field(
323            description="The configuration for the source connector as a dict or JSON string.",
324            default=None,
325        ),
326    ],
327    config_file: Annotated[
328        str | Path | None,
329        Field(
330            description="Path to a YAML or JSON file containing the source connector config.",
331            default=None,
332        ),
333    ],
334    config_secret_name: Annotated[
335        str | None,
336        Field(
337            description="The name of the secret containing the configuration.",
338            default=None,
339        ),
340    ],
341    override_execution_mode: Annotated[
342        Literal["docker", "python", "yaml", "auto"],
343        Field(
344            description="Optionally override the execution method to use for the connector. "
345            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
346            default="auto",
347        ),
348    ],
349    manifest_path: Annotated[
350        str | Path | None,
351        Field(
352            description="Path to a local YAML manifest file for declarative connectors.",
353            default=None,
354        ),
355    ],
356) -> dict[str, Any]:
357    """List all properties for a specific stream in a source connector."""
358    source: Source = _get_mcp_source(
359        connector_name=source_connector_name,
360        override_execution_mode=override_execution_mode,
361        manifest_path=manifest_path,
362    )
363    config_dict = resolve_config(
364        config=config,
365        config_file=config_file,
366        config_secret_name=config_secret_name,
367        config_spec_jsonschema=source.config_spec,
368    )
369    source.set_config(config_dict)
370    return source.get_stream_json_schema(stream_name=stream_name)
371
372
373@mcp_tool(
374    domain="local",
375    read_only=True,
376    extra_help_text=_CONFIG_HELP,
377)
378def read_source_stream_records(
379    source_connector_name: Annotated[
380        str,
381        Field(description="The name of the source connector."),
382    ],
383    config: Annotated[
384        dict | str | None,
385        Field(
386            description="The configuration for the source connector as a dict or JSON string.",
387            default=None,
388        ),
389    ],
390    config_file: Annotated[
391        str | Path | None,
392        Field(
393            description="Path to a YAML or JSON file containing the source connector config.",
394            default=None,
395        ),
396    ],
397    config_secret_name: Annotated[
398        str | None,
399        Field(
400            description="The name of the secret containing the configuration.",
401            default=None,
402        ),
403    ],
404    *,
405    stream_name: Annotated[
406        str,
407        Field(description="The name of the stream to read records from."),
408    ],
409    max_records: Annotated[
410        int,
411        Field(
412            description="The maximum number of records to read.",
413            default=1000,
414        ),
415    ],
416    override_execution_mode: Annotated[
417        Literal["docker", "python", "yaml", "auto"],
418        Field(
419            description="Optionally override the execution method to use for the connector. "
420            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
421            default="auto",
422        ),
423    ],
424    manifest_path: Annotated[
425        str | Path | None,
426        Field(
427            description="Path to a local YAML manifest file for declarative connectors.",
428            default=None,
429        ),
430    ],
431) -> list[dict[str, Any]] | str:
432    """Get records from a source connector."""
433    try:
434        source: Source = _get_mcp_source(
435            connector_name=source_connector_name,
436            override_execution_mode=override_execution_mode,
437            manifest_path=manifest_path,
438        )
439        config_dict = resolve_config(
440            config=config,
441            config_file=config_file,
442            config_secret_name=config_secret_name,
443            config_spec_jsonschema=source.config_spec,
444        )
445        source.set_config(config_dict)
446        # First we get a generator for the records in the specified stream.
447        record_generator = source.get_records(stream_name)
448        # Next we load a limited number of records from the generator into our list.
449        records: list[dict[str, Any]] = list(islice(record_generator, max_records))
450
451        print(f"Retrieved {len(records)} records from stream '{stream_name}'", sys.stderr)
452
453    except Exception as ex:
454        tb_str = traceback.format_exc()
455        # If any error occurs, we print the error message to stderr and return an empty list.
456        return (
457            f"Error reading records from source '{source_connector_name}': {ex!r}, {ex!s}\n{tb_str}"
458        )
459
460    else:
461        return records
462
463
464@mcp_tool(
465    domain="local",
466    read_only=True,
467    extra_help_text=_CONFIG_HELP,
468)
469def get_stream_previews(
470    source_name: Annotated[
471        str,
472        Field(description="The name of the source connector."),
473    ],
474    config: Annotated[
475        dict | str | None,
476        Field(
477            description="The configuration for the source connector as a dict or JSON string.",
478            default=None,
479        ),
480    ],
481    config_file: Annotated[
482        str | Path | None,
483        Field(
484            description="Path to a YAML or JSON file containing the source connector config.",
485            default=None,
486        ),
487    ],
488    config_secret_name: Annotated[
489        str | None,
490        Field(
491            description="The name of the secret containing the configuration.",
492            default=None,
493        ),
494    ],
495    streams: Annotated[
496        list[str] | str | None,
497        Field(
498            description=(
499                "The streams to get previews for. "
500                "Use '*' for all streams, or None for selected streams."
501            ),
502            default=None,
503        ),
504    ],
505    limit: Annotated[
506        int,
507        Field(
508            description="The maximum number of sample records to return per stream.",
509            default=10,
510        ),
511    ],
512    override_execution_mode: Annotated[
513        Literal["docker", "python", "yaml", "auto"],
514        Field(
515            description="Optionally override the execution method to use for the connector. "
516            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
517            default="auto",
518        ),
519    ],
520    manifest_path: Annotated[
521        str | Path | None,
522        Field(
523            description="Path to a local YAML manifest file for declarative connectors.",
524            default=None,
525        ),
526    ],
527) -> dict[str, list[dict[str, Any]] | str]:
528    """Get sample records (previews) from streams in a source connector.
529
530    This operation requires a valid configuration, including any required secrets.
531    Returns a dictionary mapping stream names to lists of sample records, or an error
532    message string if an error occurred for that stream.
533    """
534    source: Source = _get_mcp_source(
535        connector_name=source_name,
536        override_execution_mode=override_execution_mode,
537        manifest_path=manifest_path,
538    )
539
540    config_dict = resolve_config(
541        config=config,
542        config_file=config_file,
543        config_secret_name=config_secret_name,
544        config_spec_jsonschema=source.config_spec,
545    )
546    source.set_config(config_dict)
547
548    streams_param: list[str] | Literal["*"] | None = resolve_list_of_strings(
549        streams
550    )  # pyrefly: ignore[no-matching-overload]
551    if streams_param and len(streams_param) == 1 and streams_param[0] == "*":
552        streams_param = "*"
553
554    try:
555        samples_result = source.get_samples(
556            streams=streams_param,
557            limit=limit,
558            on_error="ignore",
559        )
560    except Exception as ex:
561        tb_str = traceback.format_exc()
562        return {
563            "ERROR": f"Error getting stream previews from source '{source_name}': "
564            f"{ex!r}, {ex!s}\n{tb_str}"
565        }
566
567    result: dict[str, list[dict[str, Any]] | str] = {}
568    for stream_name, dataset in samples_result.items():
569        if dataset is None:
570            result[stream_name] = f"Could not retrieve stream samples for stream '{stream_name}'"
571        else:
572            result[stream_name] = list(dataset)
573
574    return result
575
576
577@mcp_tool(
578    domain="local",
579    destructive=False,
580    extra_help_text=_CONFIG_HELP,
581)
582def sync_source_to_cache(
583    source_connector_name: Annotated[
584        str,
585        Field(description="The name of the source connector."),
586    ],
587    config: Annotated[
588        dict | str | None,
589        Field(
590            description="The configuration for the source connector as a dict or JSON string.",
591            default=None,
592        ),
593    ],
594    config_file: Annotated[
595        str | Path | None,
596        Field(
597            description="Path to a YAML or JSON file containing the source connector config.",
598            default=None,
599        ),
600    ],
601    config_secret_name: Annotated[
602        str | None,
603        Field(
604            description="The name of the secret containing the configuration.",
605            default=None,
606        ),
607    ],
608    streams: Annotated[
609        list[str] | str,
610        Field(
611            description="The streams to sync.",
612            default="suggested",
613        ),
614    ],
615    override_execution_mode: Annotated[
616        Literal["docker", "python", "yaml", "auto"],
617        Field(
618            description="Optionally override the execution method to use for the connector. "
619            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
620            default="auto",
621        ),
622    ],
623    manifest_path: Annotated[
624        str | Path | None,
625        Field(
626            description="Path to a local YAML manifest file for declarative connectors.",
627            default=None,
628        ),
629    ],
630) -> str:
631    """Run a sync from a source connector to the default DuckDB cache."""
632    source: Source = _get_mcp_source(
633        connector_name=source_connector_name,
634        override_execution_mode=override_execution_mode,
635        manifest_path=manifest_path,
636    )
637    config_dict = resolve_config(
638        config=config,
639        config_file=config_file,
640        config_secret_name=config_secret_name,
641        config_spec_jsonschema=source.config_spec,
642    )
643    source.set_config(config_dict)
644    cache = get_default_cache()
645
646    streams = resolve_list_of_strings(streams)
647    if streams and len(streams) == 1 and streams[0] in {"*", "suggested"}:
648        # Float '*' and 'suggested' to the top-level for special processing:
649        streams = streams[0]
650
651    if isinstance(streams, str) and streams == "suggested":
652        streams = "*"  # Default to all streams if 'suggested' is not otherwise specified.
653        try:
654            metadata = get_connector_metadata(
655                source_connector_name,
656            )
657        except Exception:
658            streams = "*"  # Fallback to all streams if suggested streams fail.
659        else:
660            if metadata is not None:
661                streams = metadata.suggested_streams or "*"
662
663    if isinstance(streams, str) and streams != "*":
664        streams = [streams]  # Ensure streams is a list
665
666    source.read(
667        cache=cache,
668        streams=streams,
669    )
670    del cache  # Ensure the cache is closed properly
671
672    summary: str = f"Sync completed for '{source_connector_name}'!\n\n"
673    summary += "Data written to default DuckDB cache\n"
674    return summary
675
676
677class CachedDatasetInfo(BaseModel):
678    """Class to hold information about a cached dataset."""
679
680    stream_name: str
681    """The name of the stream in the cache."""
682    table_name: str
683    schema_name: str | None = None
684
685
686@mcp_tool(
687    domain="local",
688    read_only=True,
689    idempotent=True,
690    extra_help_text=_CONFIG_HELP,
691)
692def list_cached_streams() -> list[CachedDatasetInfo]:
693    """List all streams available in the default DuckDB cache."""
694    cache: DuckDBCache = get_default_cache()
695    result = [
696        CachedDatasetInfo(
697            stream_name=stream_name,
698            table_name=(cache.table_prefix or "") + stream_name,
699            schema_name=cache.schema_name,
700        )
701        for stream_name in cache.streams
702    ]
703    del cache  # Ensure the cache is closed properly
704    return result
705
706
707@mcp_tool(
708    domain="local",
709    read_only=True,
710    idempotent=True,
711    extra_help_text=_CONFIG_HELP,
712)
713def describe_default_cache() -> dict[str, Any]:
714    """Describe the currently configured default cache."""
715    cache = get_default_cache()
716    return {
717        "cache_type": type(cache).__name__,
718        "cache_dir": str(cache.cache_dir),
719        "cache_db_path": str(Path(cache.db_path).absolute()),
720        "cached_streams": list(cache.streams.keys()),
721    }
722
723
724def _is_safe_sql(sql_query: str) -> bool:
725    """Check if a SQL query is safe to execute.
726
727    For security reasons, we only allow read-only operations like SELECT, DESCRIBE, and SHOW.
728    Multi-statement queries (containing semicolons) are also disallowed for security.
729
730    Note: SQLAlchemy will also validate downstream, but this is a first-pass check.
731
732    Args:
733        sql_query: The SQL query to check
734
735    Returns:
736        True if the query is safe to execute, False otherwise
737    """
738    # Remove leading/trailing whitespace and convert to uppercase for checking
739    normalized_query = sql_query.strip().upper()
740
741    # Disallow multi-statement queries (containing semicolons)
742    # Note: We check the original query to catch semicolons anywhere, including in comments
743    if ";" in sql_query:
744        return False
745
746    # List of allowed SQL statement prefixes (read-only operations)
747    allowed_prefixes = (
748        "SELECT",
749        "DESCRIBE",
750        "DESC",  # Short form of DESCRIBE
751        "SHOW",
752        "EXPLAIN",  # Also safe - shows query execution plan
753    )
754
755    # Check if the query starts with any allowed prefix
756    return any(normalized_query.startswith(prefix) for prefix in allowed_prefixes)
757
758
759@mcp_tool(
760    domain="local",
761    read_only=True,
762    idempotent=True,
763    extra_help_text=_CONFIG_HELP,
764)
765def run_sql_query(
766    sql_query: Annotated[
767        str,
768        Field(description="The SQL query to execute."),
769    ],
770    max_records: Annotated[
771        int,
772        Field(
773            description="Maximum number of records to return.",
774            default=1000,
775        ),
776    ],
777) -> list[dict[str, Any]]:
778    """Run a SQL query against the default cache.
779
780    The dialect of SQL should match the dialect of the default cache.
781    Use `describe_default_cache` to see the cache type.
782
783    For DuckDB-type caches:
784    - Use `SHOW TABLES` to list all tables.
785    - Use `DESCRIBE <table_name>` to get the schema of a specific table
786
787    For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN.
788    """
789    # Check if the query is safe to execute
790    if not _is_safe_sql(sql_query):
791        return [
792            {
793                "ERROR": "Unsafe SQL query detected. Only read-only operations are allowed: "
794                "SELECT, DESCRIBE, SHOW, EXPLAIN",
795                "SQL_QUERY": sql_query,
796            }
797        ]
798
799    cache: DuckDBCache = get_default_cache()
800    try:
801        return cache.run_sql_query(
802            sql_query,
803            max_records=max_records,
804        )
805    except Exception as ex:
806        tb_str = traceback.format_exc()
807        return [
808            {
809                "ERROR": f"Error running SQL query: {ex!r}, {ex!s}",
810                "TRACEBACK": tb_str,
811                "SQL_QUERY": sql_query,
812            }
813        ]
814    finally:
815        del cache  # Ensure the cache is closed properly
816
817
818def register_local_ops_tools(app: FastMCP) -> None:
819    """@private Register tools with the FastMCP app.
820
821    This is an internal function and should not be called directly.
822    """
823    register_tools(app, domain="local")
@mcp_tool(domain='local', read_only=True, idempotent=True, extra_help_text=_CONFIG_HELP)
def validate_connector_config( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the connector to validate.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the connector as a dict object or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the connector configuration.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> tuple[bool, str]:
109@mcp_tool(
110    domain="local",
111    read_only=True,
112    idempotent=True,
113    extra_help_text=_CONFIG_HELP,
114)
115def validate_connector_config(
116    connector_name: Annotated[
117        str,
118        Field(description="The name of the connector to validate."),
119    ],
120    config: Annotated[
121        dict | str | None,
122        Field(
123            description="The configuration for the connector as a dict object or JSON string.",
124            default=None,
125        ),
126    ],
127    config_file: Annotated[
128        str | Path | None,
129        Field(
130            description="Path to a YAML or JSON file containing the connector configuration.",
131            default=None,
132        ),
133    ],
134    config_secret_name: Annotated[
135        str | None,
136        Field(
137            description="The name of the secret containing the configuration.",
138            default=None,
139        ),
140    ],
141    override_execution_mode: Annotated[
142        Literal["docker", "python", "yaml", "auto"],
143        Field(
144            description="Optionally override the execution method to use for the connector. "
145            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
146            default="auto",
147        ),
148    ],
149    manifest_path: Annotated[
150        str | Path | None,
151        Field(
152            description="Path to a local YAML manifest file for declarative connectors.",
153            default=None,
154        ),
155    ],
156) -> tuple[bool, str]:
157    """Validate a connector configuration.
158
159    Returns a tuple of (is_valid: bool, message: str).
160    """
161    try:
162        source: Source = _get_mcp_source(
163            connector_name,
164            override_execution_mode=override_execution_mode,
165            manifest_path=manifest_path,
166        )
167    except Exception as ex:
168        return False, f"Failed to get connector '{connector_name}': {ex}"
169
170    try:
171        config_dict = resolve_config(
172            config=config,
173            config_file=config_file,
174            config_secret_name=config_secret_name,
175            config_spec_jsonschema=source.config_spec,
176        )
177        source.set_config(config_dict)
178    except Exception as ex:
179        return False, f"Failed to resolve configuration for {connector_name}: {ex}"
180
181    try:
182        source.check()
183    except Exception as ex:
184        return False, f"Configuration for {connector_name} is invalid: {ex}"
185
186    return True, f"Configuration for {connector_name} is valid!"

Validate a connector configuration.

Returns a tuple of (is_valid: bool, message: str).

@mcp_tool(domain='local', read_only=True, idempotent=True)
def list_connector_config_secrets( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the connector.')]) -> list[str]:
189@mcp_tool(
190    domain="local",
191    read_only=True,
192    idempotent=True,
193)
194def list_connector_config_secrets(
195    connector_name: Annotated[
196        str,
197        Field(description="The name of the connector."),
198    ],
199) -> list[str]:
200    """List all `config_secret_name` options that are known for the given connector.
201
202    This can be used to find out which already-created config secret names are available
203    for a given connector. The return value is a list of secret names, but it will not
204    return the actual secret values.
205    """
206    secrets_names: list[str] = []
207    for secrets_mgr in _get_secret_sources():
208        if isinstance(secrets_mgr, GoogleGSMSecretManager):
209            secrets_names.extend(
210                [
211                    secret_handle.secret_name.split("/")[-1]
212                    for secret_handle in secrets_mgr.fetch_connector_secrets(connector_name)
213                ]
214            )
215
216    return secrets_names

List all config_secret_name options that are known for the given connector.

This can be used to find out which already-created config secret names are available for a given connector. The return value is a list of secret names, but it will not return the actual secret values.

@mcp_tool(domain='local', read_only=True, idempotent=True, extra_help_text=_CONFIG_HELP)
def list_dotenv_secrets() -> dict[str, list[str]]:
219@mcp_tool(
220    domain="local",
221    read_only=True,
222    idempotent=True,
223    extra_help_text=_CONFIG_HELP,
224)
225def list_dotenv_secrets() -> dict[str, list[str]]:
226    """List all environment variable names declared within declared .env files.
227
228    This returns a dictionary mapping the .env file name to a list of environment
229    variable names. The values of the environment variables are not returned.
230    """
231    result: dict[str, list[str]] = {}
232    for secrets_mgr in _get_secret_sources():
233        if isinstance(secrets_mgr, DotenvSecretManager) and secrets_mgr.dotenv_path:
234            result[str(secrets_mgr.dotenv_path.resolve())] = secrets_mgr.list_secrets_names()
235
236    return result

List all environment variable names declared within declared .env files.

This returns a dictionary mapping the .env file name to a list of environment variable names. The values of the environment variables are not returned.

@mcp_tool(domain='local', read_only=True, idempotent=True, extra_help_text=_CONFIG_HELP)
def list_source_streams( source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the source connector.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector as a dict or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the source connector config.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> list[str]:
239@mcp_tool(
240    domain="local",
241    read_only=True,
242    idempotent=True,
243    extra_help_text=_CONFIG_HELP,
244)
245def list_source_streams(
246    source_connector_name: Annotated[
247        str,
248        Field(description="The name of the source connector."),
249    ],
250    config: Annotated[
251        dict | str | None,
252        Field(
253            description="The configuration for the source connector as a dict or JSON string.",
254            default=None,
255        ),
256    ],
257    config_file: Annotated[
258        str | Path | None,
259        Field(
260            description="Path to a YAML or JSON file containing the source connector config.",
261            default=None,
262        ),
263    ],
264    config_secret_name: Annotated[
265        str | None,
266        Field(
267            description="The name of the secret containing the configuration.",
268            default=None,
269        ),
270    ],
271    override_execution_mode: Annotated[
272        Literal["docker", "python", "yaml", "auto"],
273        Field(
274            description="Optionally override the execution method to use for the connector. "
275            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
276            default="auto",
277        ),
278    ],
279    manifest_path: Annotated[
280        str | Path | None,
281        Field(
282            description="Path to a local YAML manifest file for declarative connectors.",
283            default=None,
284        ),
285    ],
286) -> list[str]:
287    """List all streams available in a source connector.
288
289    This operation (generally) requires a valid configuration, including any required secrets.
290    """
291    source: Source = _get_mcp_source(
292        connector_name=source_connector_name,
293        override_execution_mode=override_execution_mode,
294        manifest_path=manifest_path,
295    )
296    config_dict = resolve_config(
297        config=config,
298        config_file=config_file,
299        config_secret_name=config_secret_name,
300        config_spec_jsonschema=source.config_spec,
301    )
302    source.set_config(config_dict)
303    return source.get_available_streams()

List all streams available in a source connector.

This operation (generally) requires a valid configuration, including any required secrets.

@mcp_tool(domain='local', read_only=True, idempotent=True, extra_help_text=_CONFIG_HELP)
def get_source_stream_json_schema( source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the source connector.')], stream_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the stream.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector as a dict or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the source connector config.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> dict[str, typing.Any]:
306@mcp_tool(
307    domain="local",
308    read_only=True,
309    idempotent=True,
310    extra_help_text=_CONFIG_HELP,
311)
312def get_source_stream_json_schema(
313    source_connector_name: Annotated[
314        str,
315        Field(description="The name of the source connector."),
316    ],
317    stream_name: Annotated[
318        str,
319        Field(description="The name of the stream."),
320    ],
321    config: Annotated[
322        dict | str | None,
323        Field(
324            description="The configuration for the source connector as a dict or JSON string.",
325            default=None,
326        ),
327    ],
328    config_file: Annotated[
329        str | Path | None,
330        Field(
331            description="Path to a YAML or JSON file containing the source connector config.",
332            default=None,
333        ),
334    ],
335    config_secret_name: Annotated[
336        str | None,
337        Field(
338            description="The name of the secret containing the configuration.",
339            default=None,
340        ),
341    ],
342    override_execution_mode: Annotated[
343        Literal["docker", "python", "yaml", "auto"],
344        Field(
345            description="Optionally override the execution method to use for the connector. "
346            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
347            default="auto",
348        ),
349    ],
350    manifest_path: Annotated[
351        str | Path | None,
352        Field(
353            description="Path to a local YAML manifest file for declarative connectors.",
354            default=None,
355        ),
356    ],
357) -> dict[str, Any]:
358    """List all properties for a specific stream in a source connector."""
359    source: Source = _get_mcp_source(
360        connector_name=source_connector_name,
361        override_execution_mode=override_execution_mode,
362        manifest_path=manifest_path,
363    )
364    config_dict = resolve_config(
365        config=config,
366        config_file=config_file,
367        config_secret_name=config_secret_name,
368        config_spec_jsonschema=source.config_spec,
369    )
370    source.set_config(config_dict)
371    return source.get_stream_json_schema(stream_name=stream_name)

List all properties for a specific stream in a source connector.

@mcp_tool(domain='local', read_only=True, extra_help_text=_CONFIG_HELP)
def read_source_stream_records( source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the source connector.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector as a dict or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the source connector config.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], *, stream_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the stream to read records from.')], max_records: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=1000, description='The maximum number of records to read.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> list[dict[str, typing.Any]] | str:
374@mcp_tool(
375    domain="local",
376    read_only=True,
377    extra_help_text=_CONFIG_HELP,
378)
379def read_source_stream_records(
380    source_connector_name: Annotated[
381        str,
382        Field(description="The name of the source connector."),
383    ],
384    config: Annotated[
385        dict | str | None,
386        Field(
387            description="The configuration for the source connector as a dict or JSON string.",
388            default=None,
389        ),
390    ],
391    config_file: Annotated[
392        str | Path | None,
393        Field(
394            description="Path to a YAML or JSON file containing the source connector config.",
395            default=None,
396        ),
397    ],
398    config_secret_name: Annotated[
399        str | None,
400        Field(
401            description="The name of the secret containing the configuration.",
402            default=None,
403        ),
404    ],
405    *,
406    stream_name: Annotated[
407        str,
408        Field(description="The name of the stream to read records from."),
409    ],
410    max_records: Annotated[
411        int,
412        Field(
413            description="The maximum number of records to read.",
414            default=1000,
415        ),
416    ],
417    override_execution_mode: Annotated[
418        Literal["docker", "python", "yaml", "auto"],
419        Field(
420            description="Optionally override the execution method to use for the connector. "
421            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
422            default="auto",
423        ),
424    ],
425    manifest_path: Annotated[
426        str | Path | None,
427        Field(
428            description="Path to a local YAML manifest file for declarative connectors.",
429            default=None,
430        ),
431    ],
432) -> list[dict[str, Any]] | str:
433    """Get records from a source connector."""
434    try:
435        source: Source = _get_mcp_source(
436            connector_name=source_connector_name,
437            override_execution_mode=override_execution_mode,
438            manifest_path=manifest_path,
439        )
440        config_dict = resolve_config(
441            config=config,
442            config_file=config_file,
443            config_secret_name=config_secret_name,
444            config_spec_jsonschema=source.config_spec,
445        )
446        source.set_config(config_dict)
447        # First we get a generator for the records in the specified stream.
448        record_generator = source.get_records(stream_name)
449        # Next we load a limited number of records from the generator into our list.
450        records: list[dict[str, Any]] = list(islice(record_generator, max_records))
451
452        print(f"Retrieved {len(records)} records from stream '{stream_name}'", sys.stderr)
453
454    except Exception as ex:
455        tb_str = traceback.format_exc()
456        # If any error occurs, we print the error message to stderr and return an empty list.
457        return (
458            f"Error reading records from source '{source_connector_name}': {ex!r}, {ex!s}\n{tb_str}"
459        )
460
461    else:
462        return records

Get records from a source connector.

@mcp_tool(domain='local', read_only=True, extra_help_text=_CONFIG_HELP)
def get_stream_previews( source_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the source connector.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector as a dict or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the source connector config.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], streams: typing.Annotated[list[str] | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description="The streams to get previews for. Use '*' for all streams, or None for selected streams.")], limit: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=10, description='The maximum number of sample records to return per stream.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> dict[str, list[dict[str, typing.Any]] | str]:
465@mcp_tool(
466    domain="local",
467    read_only=True,
468    extra_help_text=_CONFIG_HELP,
469)
470def get_stream_previews(
471    source_name: Annotated[
472        str,
473        Field(description="The name of the source connector."),
474    ],
475    config: Annotated[
476        dict | str | None,
477        Field(
478            description="The configuration for the source connector as a dict or JSON string.",
479            default=None,
480        ),
481    ],
482    config_file: Annotated[
483        str | Path | None,
484        Field(
485            description="Path to a YAML or JSON file containing the source connector config.",
486            default=None,
487        ),
488    ],
489    config_secret_name: Annotated[
490        str | None,
491        Field(
492            description="The name of the secret containing the configuration.",
493            default=None,
494        ),
495    ],
496    streams: Annotated[
497        list[str] | str | None,
498        Field(
499            description=(
500                "The streams to get previews for. "
501                "Use '*' for all streams, or None for selected streams."
502            ),
503            default=None,
504        ),
505    ],
506    limit: Annotated[
507        int,
508        Field(
509            description="The maximum number of sample records to return per stream.",
510            default=10,
511        ),
512    ],
513    override_execution_mode: Annotated[
514        Literal["docker", "python", "yaml", "auto"],
515        Field(
516            description="Optionally override the execution method to use for the connector. "
517            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
518            default="auto",
519        ),
520    ],
521    manifest_path: Annotated[
522        str | Path | None,
523        Field(
524            description="Path to a local YAML manifest file for declarative connectors.",
525            default=None,
526        ),
527    ],
528) -> dict[str, list[dict[str, Any]] | str]:
529    """Get sample records (previews) from streams in a source connector.
530
531    This operation requires a valid configuration, including any required secrets.
532    Returns a dictionary mapping stream names to lists of sample records, or an error
533    message string if an error occurred for that stream.
534    """
535    source: Source = _get_mcp_source(
536        connector_name=source_name,
537        override_execution_mode=override_execution_mode,
538        manifest_path=manifest_path,
539    )
540
541    config_dict = resolve_config(
542        config=config,
543        config_file=config_file,
544        config_secret_name=config_secret_name,
545        config_spec_jsonschema=source.config_spec,
546    )
547    source.set_config(config_dict)
548
549    streams_param: list[str] | Literal["*"] | None = resolve_list_of_strings(
550        streams
551    )  # pyrefly: ignore[no-matching-overload]
552    if streams_param and len(streams_param) == 1 and streams_param[0] == "*":
553        streams_param = "*"
554
555    try:
556        samples_result = source.get_samples(
557            streams=streams_param,
558            limit=limit,
559            on_error="ignore",
560        )
561    except Exception as ex:
562        tb_str = traceback.format_exc()
563        return {
564            "ERROR": f"Error getting stream previews from source '{source_name}': "
565            f"{ex!r}, {ex!s}\n{tb_str}"
566        }
567
568    result: dict[str, list[dict[str, Any]] | str] = {}
569    for stream_name, dataset in samples_result.items():
570        if dataset is None:
571            result[stream_name] = f"Could not retrieve stream samples for stream '{stream_name}'"
572        else:
573            result[stream_name] = list(dataset)
574
575    return result

Get sample records (previews) from streams in a source connector.

This operation requires a valid configuration, including any required secrets. Returns a dictionary mapping stream names to lists of sample records, or an error message string if an error occurred for that stream.

@mcp_tool(domain='local', destructive=False, extra_help_text=_CONFIG_HELP)
def sync_source_to_cache( source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the source connector.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector as a dict or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the source connector config.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], streams: typing.Annotated[list[str] | str, FieldInfo(annotation=NoneType, required=False, default='suggested', description='The streams to sync.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> str:
578@mcp_tool(
579    domain="local",
580    destructive=False,
581    extra_help_text=_CONFIG_HELP,
582)
583def sync_source_to_cache(
584    source_connector_name: Annotated[
585        str,
586        Field(description="The name of the source connector."),
587    ],
588    config: Annotated[
589        dict | str | None,
590        Field(
591            description="The configuration for the source connector as a dict or JSON string.",
592            default=None,
593        ),
594    ],
595    config_file: Annotated[
596        str | Path | None,
597        Field(
598            description="Path to a YAML or JSON file containing the source connector config.",
599            default=None,
600        ),
601    ],
602    config_secret_name: Annotated[
603        str | None,
604        Field(
605            description="The name of the secret containing the configuration.",
606            default=None,
607        ),
608    ],
609    streams: Annotated[
610        list[str] | str,
611        Field(
612            description="The streams to sync.",
613            default="suggested",
614        ),
615    ],
616    override_execution_mode: Annotated[
617        Literal["docker", "python", "yaml", "auto"],
618        Field(
619            description="Optionally override the execution method to use for the connector. "
620            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
621            default="auto",
622        ),
623    ],
624    manifest_path: Annotated[
625        str | Path | None,
626        Field(
627            description="Path to a local YAML manifest file for declarative connectors.",
628            default=None,
629        ),
630    ],
631) -> str:
632    """Run a sync from a source connector to the default DuckDB cache."""
633    source: Source = _get_mcp_source(
634        connector_name=source_connector_name,
635        override_execution_mode=override_execution_mode,
636        manifest_path=manifest_path,
637    )
638    config_dict = resolve_config(
639        config=config,
640        config_file=config_file,
641        config_secret_name=config_secret_name,
642        config_spec_jsonschema=source.config_spec,
643    )
644    source.set_config(config_dict)
645    cache = get_default_cache()
646
647    streams = resolve_list_of_strings(streams)
648    if streams and len(streams) == 1 and streams[0] in {"*", "suggested"}:
649        # Float '*' and 'suggested' to the top-level for special processing:
650        streams = streams[0]
651
652    if isinstance(streams, str) and streams == "suggested":
653        streams = "*"  # Default to all streams if 'suggested' is not otherwise specified.
654        try:
655            metadata = get_connector_metadata(
656                source_connector_name,
657            )
658        except Exception:
659            streams = "*"  # Fallback to all streams if suggested streams fail.
660        else:
661            if metadata is not None:
662                streams = metadata.suggested_streams or "*"
663
664    if isinstance(streams, str) and streams != "*":
665        streams = [streams]  # Ensure streams is a list
666
667    source.read(
668        cache=cache,
669        streams=streams,
670    )
671    del cache  # Ensure the cache is closed properly
672
673    summary: str = f"Sync completed for '{source_connector_name}'!\n\n"
674    summary += "Data written to default DuckDB cache\n"
675    return summary

Run a sync from a source connector to the default DuckDB cache.

class CachedDatasetInfo(pydantic.main.BaseModel):
678class CachedDatasetInfo(BaseModel):
679    """Class to hold information about a cached dataset."""
680
681    stream_name: str
682    """The name of the stream in the cache."""
683    table_name: str
684    schema_name: str | None = None

Class to hold information about a cached dataset.

stream_name: str

The name of the stream in the cache.

table_name: str
schema_name: str | None
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
@mcp_tool(domain='local', read_only=True, idempotent=True, extra_help_text=_CONFIG_HELP)
def list_cached_streams() -> list[CachedDatasetInfo]:
687@mcp_tool(
688    domain="local",
689    read_only=True,
690    idempotent=True,
691    extra_help_text=_CONFIG_HELP,
692)
693def list_cached_streams() -> list[CachedDatasetInfo]:
694    """List all streams available in the default DuckDB cache."""
695    cache: DuckDBCache = get_default_cache()
696    result = [
697        CachedDatasetInfo(
698            stream_name=stream_name,
699            table_name=(cache.table_prefix or "") + stream_name,
700            schema_name=cache.schema_name,
701        )
702        for stream_name in cache.streams
703    ]
704    del cache  # Ensure the cache is closed properly
705    return result

List all streams available in the default DuckDB cache.

@mcp_tool(domain='local', read_only=True, idempotent=True, extra_help_text=_CONFIG_HELP)
def describe_default_cache() -> dict[str, typing.Any]:
708@mcp_tool(
709    domain="local",
710    read_only=True,
711    idempotent=True,
712    extra_help_text=_CONFIG_HELP,
713)
714def describe_default_cache() -> dict[str, Any]:
715    """Describe the currently configured default cache."""
716    cache = get_default_cache()
717    return {
718        "cache_type": type(cache).__name__,
719        "cache_dir": str(cache.cache_dir),
720        "cache_db_path": str(Path(cache.db_path).absolute()),
721        "cached_streams": list(cache.streams.keys()),
722    }

Describe the currently configured default cache.

@mcp_tool(domain='local', read_only=True, idempotent=True, extra_help_text=_CONFIG_HELP)
def run_sql_query( sql_query: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The SQL query to execute.')], max_records: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=1000, description='Maximum number of records to return.')]) -> list[dict[str, typing.Any]]:
760@mcp_tool(
761    domain="local",
762    read_only=True,
763    idempotent=True,
764    extra_help_text=_CONFIG_HELP,
765)
766def run_sql_query(
767    sql_query: Annotated[
768        str,
769        Field(description="The SQL query to execute."),
770    ],
771    max_records: Annotated[
772        int,
773        Field(
774            description="Maximum number of records to return.",
775            default=1000,
776        ),
777    ],
778) -> list[dict[str, Any]]:
779    """Run a SQL query against the default cache.
780
781    The dialect of SQL should match the dialect of the default cache.
782    Use `describe_default_cache` to see the cache type.
783
784    For DuckDB-type caches:
785    - Use `SHOW TABLES` to list all tables.
786    - Use `DESCRIBE <table_name>` to get the schema of a specific table
787
788    For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN.
789    """
790    # Check if the query is safe to execute
791    if not _is_safe_sql(sql_query):
792        return [
793            {
794                "ERROR": "Unsafe SQL query detected. Only read-only operations are allowed: "
795                "SELECT, DESCRIBE, SHOW, EXPLAIN",
796                "SQL_QUERY": sql_query,
797            }
798        ]
799
800    cache: DuckDBCache = get_default_cache()
801    try:
802        return cache.run_sql_query(
803            sql_query,
804            max_records=max_records,
805        )
806    except Exception as ex:
807        tb_str = traceback.format_exc()
808        return [
809            {
810                "ERROR": f"Error running SQL query: {ex!r}, {ex!s}",
811                "TRACEBACK": tb_str,
812                "SQL_QUERY": sql_query,
813            }
814        ]
815    finally:
816        del cache  # Ensure the cache is closed properly

Run a SQL query against the default cache.

The dialect of SQL should match the dialect of the default cache. Use describe_default_cache to see the cache type.

For DuckDB-type caches:

  • Use SHOW TABLES to list all tables.
  • Use DESCRIBE <table_name> to get the schema of a specific table

For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN.