airbyte.mcp.local_ops

Local MCP operations.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Local MCP operations."""
  3
  4import sys
  5import traceback
  6from itertools import islice
  7from pathlib import Path
  8from typing import TYPE_CHECKING, Annotated, Any, Literal
  9
 10from fastmcp import FastMCP
 11from pydantic import BaseModel, Field
 12
 13from airbyte import get_source
 14from airbyte._util.meta import is_docker_installed
 15from airbyte.caches.util import get_default_cache
 16from airbyte.mcp._util import resolve_config, resolve_list_of_strings
 17from airbyte.secrets.config import _get_secret_sources
 18from airbyte.secrets.env_vars import DotenvSecretManager
 19from airbyte.secrets.google_gsm import GoogleGSMSecretManager
 20from airbyte.sources.base import Source
 21from airbyte.sources.registry import get_connector_metadata
 22
 23
 24if TYPE_CHECKING:
 25    from airbyte.caches.duckdb import DuckDBCache
 26
 27
 28_CONFIG_HELP = """
 29You can provide `config` as JSON or a Path to a YAML/JSON file.
 30If a `dict` is provided, it must not contain hardcoded secrets.
 31Instead, secrets should be provided using environment variables,
 32and the config should reference them using the format
 33`secret_reference::ENV_VAR_NAME`.
 34
 35You can also provide a `config_secret_name` to use a specific
 36secret name for the configuration. This is useful if you want to
 37validate a configuration that is stored in a secrets manager.
 38
 39If `config_secret_name` is provided, it should point to a string
 40that contains valid JSON or YAML.
 41
 42If both `config` and `config_secret_name` are provided, the
 43`config` will be loaded first and then the referenced secret config
 44will be layered on top of the non-secret config.
 45
 46For declarative connectors, you can provide a `manifest_path` to
 47specify a local YAML manifest file instead of using the registry
 48version. This is useful for testing custom or locally-developed
 49connector manifests.
 50"""
 51
 52
 53def _get_mcp_source(
 54    connector_name: str,
 55    override_execution_mode: Literal["auto", "docker", "python", "yaml"] = "auto",
 56    *,
 57    install_if_missing: bool = True,
 58    manifest_path: str | Path | None,
 59) -> Source:
 60    """Get the MCP source for a connector."""
 61    if manifest_path:
 62        override_execution_mode = "yaml"
 63    elif override_execution_mode == "auto" and is_docker_installed():
 64        override_execution_mode = "docker"
 65
 66    source: Source
 67    if override_execution_mode == "auto":
 68        # Use defaults with no overrides
 69        source = get_source(
 70            connector_name,
 71            install_if_missing=False,
 72            source_manifest=manifest_path or None,
 73        )
 74    elif override_execution_mode == "python":
 75        source = get_source(
 76            connector_name,
 77            use_python=True,
 78            install_if_missing=False,
 79            source_manifest=manifest_path or None,
 80        )
 81    elif override_execution_mode == "docker":
 82        source = get_source(
 83            connector_name,
 84            docker_image=True,
 85            install_if_missing=False,
 86            source_manifest=manifest_path or None,
 87        )
 88    elif override_execution_mode == "yaml":
 89        source = get_source(
 90            connector_name,
 91            source_manifest=manifest_path or True,
 92            install_if_missing=False,
 93        )
 94    else:
 95        raise ValueError(
 96            f"Unknown execution method: {override_execution_mode}. "
 97            "Expected one of: ['auto', 'docker', 'python', 'yaml']."
 98        )
 99
100    # Ensure installed:
101    if install_if_missing:
102        source.executor.ensure_installation()
103
104    return source
105
106
107# @app.tool()  # << deferred
108def validate_connector_config(
109    connector_name: Annotated[
110        str,
111        Field(description="The name of the connector to validate."),
112    ],
113    config: Annotated[
114        dict | str | None,
115        Field(
116            description="The configuration for the connector as a dict object or JSON string.",
117            default=None,
118        ),
119    ],
120    config_file: Annotated[
121        str | Path | None,
122        Field(
123            description="Path to a YAML or JSON file containing the connector configuration.",
124            default=None,
125        ),
126    ],
127    config_secret_name: Annotated[
128        str | None,
129        Field(
130            description="The name of the secret containing the configuration.",
131            default=None,
132        ),
133    ],
134    override_execution_mode: Annotated[
135        Literal["docker", "python", "yaml", "auto"],
136        Field(
137            description="Optionally override the execution method to use for the connector. "
138            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
139            default="auto",
140        ),
141    ],
142    manifest_path: Annotated[
143        str | Path | None,
144        Field(
145            description="Path to a local YAML manifest file for declarative connectors.",
146            default=None,
147        ),
148    ],
149) -> tuple[bool, str]:
150    """Validate a connector configuration.
151
152    Returns a tuple of (is_valid: bool, message: str).
153    """
154    try:
155        source: Source = _get_mcp_source(
156            connector_name,
157            override_execution_mode=override_execution_mode,
158            manifest_path=manifest_path,
159        )
160    except Exception as ex:
161        return False, f"Failed to get connector '{connector_name}': {ex}"
162
163    try:
164        config_dict = resolve_config(
165            config=config,
166            config_file=config_file,
167            config_secret_name=config_secret_name,
168            config_spec_jsonschema=source.config_spec,
169        )
170        source.set_config(config_dict)
171    except Exception as ex:
172        return False, f"Failed to resolve configuration for {connector_name}: {ex}"
173
174    try:
175        source.check()
176    except Exception as ex:
177        return False, f"Configuration for {connector_name} is invalid: {ex}"
178
179    return True, f"Configuration for {connector_name} is valid!"
180
181
182# @app.tool()  # << deferred
183def list_connector_config_secrets(
184    connector_name: Annotated[
185        str,
186        Field(description="The name of the connector."),
187    ],
188) -> list[str]:
189    """List all `config_secret_name` options that are known for the given connector.
190
191    This can be used to find out which already-created config secret names are available
192    for a given connector. The return value is a list of secret names, but it will not
193    return the actual secret values.
194    """
195    secrets_names: list[str] = []
196    for secrets_mgr in _get_secret_sources():
197        if isinstance(secrets_mgr, GoogleGSMSecretManager):
198            secrets_names.extend(
199                [
200                    secret_handle.secret_name.split("/")[-1]
201                    for secret_handle in secrets_mgr.fetch_connector_secrets(connector_name)
202                ]
203            )
204
205    return secrets_names
206
207
208def list_dotenv_secrets() -> dict[str, list[str]]:
209    """List all environment variable names declared within declared .env files.
210
211    This returns a dictionary mapping the .env file name to a list of environment
212    variable names. The values of the environment variables are not returned.
213    """
214    result: dict[str, list[str]] = {}
215    for secrets_mgr in _get_secret_sources():
216        if isinstance(secrets_mgr, DotenvSecretManager) and secrets_mgr.dotenv_path:
217            result[str(secrets_mgr.dotenv_path.resolve())] = secrets_mgr.list_secrets_names()
218
219    return result
220
221
222# @app.tool()  # << deferred
223def list_source_streams(
224    source_connector_name: Annotated[
225        str,
226        Field(description="The name of the source connector."),
227    ],
228    config: Annotated[
229        dict | str | None,
230        Field(
231            description="The configuration for the source connector as a dict or JSON string.",
232            default=None,
233        ),
234    ],
235    config_file: Annotated[
236        str | Path | None,
237        Field(
238            description="Path to a YAML or JSON file containing the source connector config.",
239            default=None,
240        ),
241    ],
242    config_secret_name: Annotated[
243        str | None,
244        Field(
245            description="The name of the secret containing the configuration.",
246            default=None,
247        ),
248    ],
249    override_execution_mode: Annotated[
250        Literal["docker", "python", "yaml", "auto"],
251        Field(
252            description="Optionally override the execution method to use for the connector. "
253            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
254            default="auto",
255        ),
256    ],
257    manifest_path: Annotated[
258        str | Path | None,
259        Field(
260            description="Path to a local YAML manifest file for declarative connectors.",
261            default=None,
262        ),
263    ],
264) -> list[str]:
265    """List all streams available in a source connector.
266
267    This operation (generally) requires a valid configuration, including any required secrets.
268    """
269    source: Source = _get_mcp_source(
270        connector_name=source_connector_name,
271        override_execution_mode=override_execution_mode,
272        manifest_path=manifest_path,
273    )
274    config_dict = resolve_config(
275        config=config,
276        config_file=config_file,
277        config_secret_name=config_secret_name,
278        config_spec_jsonschema=source.config_spec,
279    )
280    source.set_config(config_dict)
281    return source.get_available_streams()
282
283
284# @app.tool()  # << deferred
285def get_source_stream_json_schema(
286    source_connector_name: Annotated[
287        str,
288        Field(description="The name of the source connector."),
289    ],
290    stream_name: Annotated[
291        str,
292        Field(description="The name of the stream."),
293    ],
294    config: Annotated[
295        dict | str | None,
296        Field(
297            description="The configuration for the source connector as a dict or JSON string.",
298            default=None,
299        ),
300    ],
301    config_file: Annotated[
302        str | Path | None,
303        Field(
304            description="Path to a YAML or JSON file containing the source connector config.",
305            default=None,
306        ),
307    ],
308    config_secret_name: Annotated[
309        str | None,
310        Field(
311            description="The name of the secret containing the configuration.",
312            default=None,
313        ),
314    ],
315    override_execution_mode: Annotated[
316        Literal["docker", "python", "yaml", "auto"],
317        Field(
318            description="Optionally override the execution method to use for the connector. "
319            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
320            default="auto",
321        ),
322    ],
323    manifest_path: Annotated[
324        str | Path | None,
325        Field(
326            description="Path to a local YAML manifest file for declarative connectors.",
327            default=None,
328        ),
329    ],
330) -> dict[str, Any]:
331    """List all properties for a specific stream in a source connector."""
332    source: Source = _get_mcp_source(
333        connector_name=source_connector_name,
334        override_execution_mode=override_execution_mode,
335        manifest_path=manifest_path,
336    )
337    config_dict = resolve_config(
338        config=config,
339        config_file=config_file,
340        config_secret_name=config_secret_name,
341        config_spec_jsonschema=source.config_spec,
342    )
343    source.set_config(config_dict)
344    return source.get_stream_json_schema(stream_name=stream_name)
345
346
347# @app.tool()  # << deferred
348def read_source_stream_records(
349    source_connector_name: Annotated[
350        str,
351        Field(description="The name of the source connector."),
352    ],
353    config: Annotated[
354        dict | str | None,
355        Field(
356            description="The configuration for the source connector as a dict or JSON string.",
357            default=None,
358        ),
359    ],
360    config_file: Annotated[
361        str | Path | None,
362        Field(
363            description="Path to a YAML or JSON file containing the source connector config.",
364            default=None,
365        ),
366    ],
367    config_secret_name: Annotated[
368        str | None,
369        Field(
370            description="The name of the secret containing the configuration.",
371            default=None,
372        ),
373    ],
374    *,
375    stream_name: Annotated[
376        str,
377        Field(description="The name of the stream to read records from."),
378    ],
379    max_records: Annotated[
380        int,
381        Field(
382            description="The maximum number of records to read.",
383            default=1000,
384        ),
385    ],
386    override_execution_mode: Annotated[
387        Literal["docker", "python", "yaml", "auto"],
388        Field(
389            description="Optionally override the execution method to use for the connector. "
390            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
391            default="auto",
392        ),
393    ],
394    manifest_path: Annotated[
395        str | Path | None,
396        Field(
397            description="Path to a local YAML manifest file for declarative connectors.",
398            default=None,
399        ),
400    ],
401) -> list[dict[str, Any]] | str:
402    """Get records from a source connector."""
403    try:
404        source: Source = _get_mcp_source(
405            connector_name=source_connector_name,
406            override_execution_mode=override_execution_mode,
407            manifest_path=manifest_path,
408        )
409        config_dict = resolve_config(
410            config=config,
411            config_file=config_file,
412            config_secret_name=config_secret_name,
413            config_spec_jsonschema=source.config_spec,
414        )
415        source.set_config(config_dict)
416        # First we get a generator for the records in the specified stream.
417        record_generator = source.get_records(stream_name)
418        # Next we load a limited number of records from the generator into our list.
419        records: list[dict[str, Any]] = list(islice(record_generator, max_records))
420
421        print(f"Retrieved {len(records)} records from stream '{stream_name}'", sys.stderr)
422
423    except Exception as ex:
424        tb_str = traceback.format_exc()
425        # If any error occurs, we print the error message to stderr and return an empty list.
426        return (
427            f"Error reading records from source '{source_connector_name}': {ex!r}, {ex!s}\n{tb_str}"
428        )
429
430    else:
431        return records
432
433
434# @app.tool()  # << deferred
435def get_stream_previews(
436    source_name: Annotated[
437        str,
438        Field(description="The name of the source connector."),
439    ],
440    config: Annotated[
441        dict | str | None,
442        Field(
443            description="The configuration for the source connector as a dict or JSON string.",
444            default=None,
445        ),
446    ],
447    config_file: Annotated[
448        str | Path | None,
449        Field(
450            description="Path to a YAML or JSON file containing the source connector config.",
451            default=None,
452        ),
453    ],
454    config_secret_name: Annotated[
455        str | None,
456        Field(
457            description="The name of the secret containing the configuration.",
458            default=None,
459        ),
460    ],
461    streams: Annotated[
462        list[str] | str | None,
463        Field(
464            description=(
465                "The streams to get previews for. "
466                "Use '*' for all streams, or None for selected streams."
467            ),
468            default=None,
469        ),
470    ],
471    limit: Annotated[
472        int,
473        Field(
474            description="The maximum number of sample records to return per stream.",
475            default=10,
476        ),
477    ],
478    override_execution_mode: Annotated[
479        Literal["docker", "python", "yaml", "auto"],
480        Field(
481            description="Optionally override the execution method to use for the connector. "
482            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
483            default="auto",
484        ),
485    ],
486    manifest_path: Annotated[
487        str | Path | None,
488        Field(
489            description="Path to a local YAML manifest file for declarative connectors.",
490            default=None,
491        ),
492    ],
493) -> dict[str, list[dict[str, Any]] | str]:
494    """Get sample records (previews) from streams in a source connector.
495
496    This operation requires a valid configuration, including any required secrets.
497    Returns a dictionary mapping stream names to lists of sample records, or an error
498    message string if an error occurred for that stream.
499    """
500    source: Source = _get_mcp_source(
501        connector_name=source_name,
502        override_execution_mode=override_execution_mode,
503        manifest_path=manifest_path,
504    )
505
506    config_dict = resolve_config(
507        config=config,
508        config_file=config_file,
509        config_secret_name=config_secret_name,
510        config_spec_jsonschema=source.config_spec,
511    )
512    source.set_config(config_dict)
513
514    streams_param: list[str] | Literal["*"] | None = resolve_list_of_strings(streams)
515    if streams_param and len(streams_param) == 1 and streams_param[0] == "*":
516        streams_param = "*"
517
518    try:
519        samples_result = source.get_samples(
520            streams=streams_param,
521            limit=limit,
522            on_error="ignore",
523        )
524    except Exception as ex:
525        tb_str = traceback.format_exc()
526        return {
527            "ERROR": f"Error getting stream previews from source '{source_name}': "
528            f"{ex!r}, {ex!s}\n{tb_str}"
529        }
530
531    result: dict[str, list[dict[str, Any]] | str] = {}
532    for stream_name, dataset in samples_result.items():
533        if dataset is None:
534            result[stream_name] = f"Could not retrieve stream samples for stream '{stream_name}'"
535        else:
536            result[stream_name] = list(dataset)
537
538    return result
539
540
541# @app.tool()  # << deferred
542def sync_source_to_cache(
543    source_connector_name: Annotated[
544        str,
545        Field(description="The name of the source connector."),
546    ],
547    config: Annotated[
548        dict | str | None,
549        Field(
550            description="The configuration for the source connector as a dict or JSON string.",
551            default=None,
552        ),
553    ],
554    config_file: Annotated[
555        str | Path | None,
556        Field(
557            description="Path to a YAML or JSON file containing the source connector config.",
558            default=None,
559        ),
560    ],
561    config_secret_name: Annotated[
562        str | None,
563        Field(
564            description="The name of the secret containing the configuration.",
565            default=None,
566        ),
567    ],
568    streams: Annotated[
569        list[str] | str,
570        Field(
571            description="The streams to sync.",
572            default="suggested",
573        ),
574    ],
575    override_execution_mode: Annotated[
576        Literal["docker", "python", "yaml", "auto"],
577        Field(
578            description="Optionally override the execution method to use for the connector. "
579            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
580            default="auto",
581        ),
582    ],
583    manifest_path: Annotated[
584        str | Path | None,
585        Field(
586            description="Path to a local YAML manifest file for declarative connectors.",
587            default=None,
588        ),
589    ],
590) -> str:
591    """Run a sync from a source connector to the default DuckDB cache."""
592    source: Source = _get_mcp_source(
593        connector_name=source_connector_name,
594        override_execution_mode=override_execution_mode,
595        manifest_path=manifest_path,
596    )
597    config_dict = resolve_config(
598        config=config,
599        config_file=config_file,
600        config_secret_name=config_secret_name,
601        config_spec_jsonschema=source.config_spec,
602    )
603    source.set_config(config_dict)
604    cache = get_default_cache()
605
606    streams = resolve_list_of_strings(streams)
607    if streams and len(streams) == 1 and streams[0] in {"*", "suggested"}:
608        # Float '*' and 'suggested' to the top-level for special processing:
609        streams = streams[0]
610
611    if isinstance(streams, str) and streams == "suggested":
612        streams = "*"  # Default to all streams if 'suggested' is not otherwise specified.
613        try:
614            metadata = get_connector_metadata(
615                source_connector_name,
616            )
617        except Exception:
618            streams = "*"  # Fallback to all streams if suggested streams fail.
619        else:
620            if metadata is not None:
621                streams = metadata.suggested_streams or "*"
622
623    if isinstance(streams, str) and streams != "*":
624        streams = [streams]  # Ensure streams is a list
625
626    source.read(
627        cache=cache,
628        streams=streams,
629    )
630    del cache  # Ensure the cache is closed properly
631
632    summary: str = f"Sync completed for '{source_connector_name}'!\n\n"
633    summary += "Data written to default DuckDB cache\n"
634    return summary
635
636
637class CachedDatasetInfo(BaseModel):
638    """Class to hold information about a cached dataset."""
639
640    stream_name: str
641    """The name of the stream in the cache."""
642    table_name: str
643    schema_name: str | None = None
644
645
646# @app.tool()  # << deferred
647def list_cached_streams() -> list[CachedDatasetInfo]:
648    """List all streams available in the default DuckDB cache."""
649    cache: DuckDBCache = get_default_cache()
650    result = [
651        CachedDatasetInfo(
652            stream_name=stream_name,
653            table_name=(cache.table_prefix or "") + stream_name,
654            schema_name=cache.schema_name,
655        )
656        for stream_name in cache.streams
657    ]
658    del cache  # Ensure the cache is closed properly
659    return result
660
661
662# @app.tool()  # << deferred
663def describe_default_cache() -> dict[str, Any]:
664    """Describe the currently configured default cache."""
665    cache = get_default_cache()
666    return {
667        "cache_type": type(cache).__name__,
668        "cache_dir": str(cache.cache_dir),
669        "cache_db_path": str(Path(cache.db_path).absolute()),
670        "cached_streams": list(cache.streams.keys()),
671    }
672
673
674def _is_safe_sql(sql_query: str) -> bool:
675    """Check if a SQL query is safe to execute.
676
677    For security reasons, we only allow read-only operations like SELECT, DESCRIBE, and SHOW.
678    Multi-statement queries (containing semicolons) are also disallowed for security.
679
680    Note: SQLAlchemy will also validate downstream, but this is a first-pass check.
681
682    Args:
683        sql_query: The SQL query to check
684
685    Returns:
686        True if the query is safe to execute, False otherwise
687    """
688    # Remove leading/trailing whitespace and convert to uppercase for checking
689    normalized_query = sql_query.strip().upper()
690
691    # Disallow multi-statement queries (containing semicolons)
692    # Note: We check the original query to catch semicolons anywhere, including in comments
693    if ";" in sql_query:
694        return False
695
696    # List of allowed SQL statement prefixes (read-only operations)
697    allowed_prefixes = (
698        "SELECT",
699        "DESCRIBE",
700        "DESC",  # Short form of DESCRIBE
701        "SHOW",
702        "EXPLAIN",  # Also safe - shows query execution plan
703    )
704
705    # Check if the query starts with any allowed prefix
706    return any(normalized_query.startswith(prefix) for prefix in allowed_prefixes)
707
708
709# @app.tool()  # << deferred
710def run_sql_query(
711    sql_query: Annotated[
712        str,
713        Field(description="The SQL query to execute."),
714    ],
715    max_records: Annotated[
716        int,
717        Field(
718            description="Maximum number of records to return.",
719            default=1000,
720        ),
721    ],
722) -> list[dict[str, Any]]:
723    """Run a SQL query against the default cache.
724
725    The dialect of SQL should match the dialect of the default cache.
726    Use `describe_default_cache` to see the cache type.
727
728    For DuckDB-type caches:
729    - Use `SHOW TABLES` to list all tables.
730    - Use `DESCRIBE <table_name>` to get the schema of a specific table
731
732    For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN.
733    """
734    # Check if the query is safe to execute
735    if not _is_safe_sql(sql_query):
736        return [
737            {
738                "ERROR": "Unsafe SQL query detected. Only read-only operations are allowed: "
739                "SELECT, DESCRIBE, SHOW, EXPLAIN",
740                "SQL_QUERY": sql_query,
741            }
742        ]
743
744    cache: DuckDBCache = get_default_cache()
745    try:
746        return cache.run_sql_query(
747            sql_query,
748            max_records=max_records,
749        )
750    except Exception as ex:
751        tb_str = traceback.format_exc()
752        return [
753            {
754                "ERROR": f"Error running SQL query: {ex!r}, {ex!s}",
755                "TRACEBACK": tb_str,
756                "SQL_QUERY": sql_query,
757            }
758        ]
759    finally:
760        del cache  # Ensure the cache is closed properly
761
762
763def register_local_ops_tools(app: FastMCP) -> None:
764    """@private Register tools with the FastMCP app.
765
766    This is an internal function and should not be called directly.
767    """
768    app.tool(list_connector_config_secrets)
769    for tool in (
770        describe_default_cache,
771        get_source_stream_json_schema,
772        get_stream_previews,
773        list_cached_streams,
774        list_dotenv_secrets,
775        list_source_streams,
776        read_source_stream_records,
777        run_sql_query,
778        sync_source_to_cache,
779        validate_connector_config,
780    ):
781        # Register each tool with the FastMCP app.
782        app.tool(
783            tool,
784            description=(tool.__doc__ or "").rstrip() + "\n" + _CONFIG_HELP,
785        )
def validate_connector_config( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the connector to validate.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the connector as a dict object or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the connector configuration.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> tuple[bool, str]:
109def validate_connector_config(
110    connector_name: Annotated[
111        str,
112        Field(description="The name of the connector to validate."),
113    ],
114    config: Annotated[
115        dict | str | None,
116        Field(
117            description="The configuration for the connector as a dict object or JSON string.",
118            default=None,
119        ),
120    ],
121    config_file: Annotated[
122        str | Path | None,
123        Field(
124            description="Path to a YAML or JSON file containing the connector configuration.",
125            default=None,
126        ),
127    ],
128    config_secret_name: Annotated[
129        str | None,
130        Field(
131            description="The name of the secret containing the configuration.",
132            default=None,
133        ),
134    ],
135    override_execution_mode: Annotated[
136        Literal["docker", "python", "yaml", "auto"],
137        Field(
138            description="Optionally override the execution method to use for the connector. "
139            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
140            default="auto",
141        ),
142    ],
143    manifest_path: Annotated[
144        str | Path | None,
145        Field(
146            description="Path to a local YAML manifest file for declarative connectors.",
147            default=None,
148        ),
149    ],
150) -> tuple[bool, str]:
151    """Validate a connector configuration.
152
153    Returns a tuple of (is_valid: bool, message: str).
154    """
155    try:
156        source: Source = _get_mcp_source(
157            connector_name,
158            override_execution_mode=override_execution_mode,
159            manifest_path=manifest_path,
160        )
161    except Exception as ex:
162        return False, f"Failed to get connector '{connector_name}': {ex}"
163
164    try:
165        config_dict = resolve_config(
166            config=config,
167            config_file=config_file,
168            config_secret_name=config_secret_name,
169            config_spec_jsonschema=source.config_spec,
170        )
171        source.set_config(config_dict)
172    except Exception as ex:
173        return False, f"Failed to resolve configuration for {connector_name}: {ex}"
174
175    try:
176        source.check()
177    except Exception as ex:
178        return False, f"Configuration for {connector_name} is invalid: {ex}"
179
180    return True, f"Configuration for {connector_name} is valid!"

Validate a connector configuration.

Returns a tuple of (is_valid: bool, message: str).

def list_connector_config_secrets( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the connector.')]) -> list[str]:
184def list_connector_config_secrets(
185    connector_name: Annotated[
186        str,
187        Field(description="The name of the connector."),
188    ],
189) -> list[str]:
190    """List all `config_secret_name` options that are known for the given connector.
191
192    This can be used to find out which already-created config secret names are available
193    for a given connector. The return value is a list of secret names, but it will not
194    return the actual secret values.
195    """
196    secrets_names: list[str] = []
197    for secrets_mgr in _get_secret_sources():
198        if isinstance(secrets_mgr, GoogleGSMSecretManager):
199            secrets_names.extend(
200                [
201                    secret_handle.secret_name.split("/")[-1]
202                    for secret_handle in secrets_mgr.fetch_connector_secrets(connector_name)
203                ]
204            )
205
206    return secrets_names

List all config_secret_name options that are known for the given connector.

This can be used to find out which already-created config secret names are available for a given connector. The return value is a list of secret names, but it will not return the actual secret values.

def list_dotenv_secrets() -> dict[str, list[str]]:
209def list_dotenv_secrets() -> dict[str, list[str]]:
210    """List all environment variable names declared within declared .env files.
211
212    This returns a dictionary mapping the .env file name to a list of environment
213    variable names. The values of the environment variables are not returned.
214    """
215    result: dict[str, list[str]] = {}
216    for secrets_mgr in _get_secret_sources():
217        if isinstance(secrets_mgr, DotenvSecretManager) and secrets_mgr.dotenv_path:
218            result[str(secrets_mgr.dotenv_path.resolve())] = secrets_mgr.list_secrets_names()
219
220    return result

List all environment variable names declared within declared .env files.

This returns a dictionary mapping the .env file name to a list of environment variable names. The values of the environment variables are not returned.

def list_source_streams( source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the source connector.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector as a dict or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the source connector config.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> list[str]:
224def list_source_streams(
225    source_connector_name: Annotated[
226        str,
227        Field(description="The name of the source connector."),
228    ],
229    config: Annotated[
230        dict | str | None,
231        Field(
232            description="The configuration for the source connector as a dict or JSON string.",
233            default=None,
234        ),
235    ],
236    config_file: Annotated[
237        str | Path | None,
238        Field(
239            description="Path to a YAML or JSON file containing the source connector config.",
240            default=None,
241        ),
242    ],
243    config_secret_name: Annotated[
244        str | None,
245        Field(
246            description="The name of the secret containing the configuration.",
247            default=None,
248        ),
249    ],
250    override_execution_mode: Annotated[
251        Literal["docker", "python", "yaml", "auto"],
252        Field(
253            description="Optionally override the execution method to use for the connector. "
254            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
255            default="auto",
256        ),
257    ],
258    manifest_path: Annotated[
259        str | Path | None,
260        Field(
261            description="Path to a local YAML manifest file for declarative connectors.",
262            default=None,
263        ),
264    ],
265) -> list[str]:
266    """List all streams available in a source connector.
267
268    This operation (generally) requires a valid configuration, including any required secrets.
269    """
270    source: Source = _get_mcp_source(
271        connector_name=source_connector_name,
272        override_execution_mode=override_execution_mode,
273        manifest_path=manifest_path,
274    )
275    config_dict = resolve_config(
276        config=config,
277        config_file=config_file,
278        config_secret_name=config_secret_name,
279        config_spec_jsonschema=source.config_spec,
280    )
281    source.set_config(config_dict)
282    return source.get_available_streams()

List all streams available in a source connector.

This operation (generally) requires a valid configuration, including any required secrets.

def get_source_stream_json_schema( source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the source connector.')], stream_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the stream.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector as a dict or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the source connector config.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> dict[str, typing.Any]:
286def get_source_stream_json_schema(
287    source_connector_name: Annotated[
288        str,
289        Field(description="The name of the source connector."),
290    ],
291    stream_name: Annotated[
292        str,
293        Field(description="The name of the stream."),
294    ],
295    config: Annotated[
296        dict | str | None,
297        Field(
298            description="The configuration for the source connector as a dict or JSON string.",
299            default=None,
300        ),
301    ],
302    config_file: Annotated[
303        str | Path | None,
304        Field(
305            description="Path to a YAML or JSON file containing the source connector config.",
306            default=None,
307        ),
308    ],
309    config_secret_name: Annotated[
310        str | None,
311        Field(
312            description="The name of the secret containing the configuration.",
313            default=None,
314        ),
315    ],
316    override_execution_mode: Annotated[
317        Literal["docker", "python", "yaml", "auto"],
318        Field(
319            description="Optionally override the execution method to use for the connector. "
320            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
321            default="auto",
322        ),
323    ],
324    manifest_path: Annotated[
325        str | Path | None,
326        Field(
327            description="Path to a local YAML manifest file for declarative connectors.",
328            default=None,
329        ),
330    ],
331) -> dict[str, Any]:
332    """List all properties for a specific stream in a source connector."""
333    source: Source = _get_mcp_source(
334        connector_name=source_connector_name,
335        override_execution_mode=override_execution_mode,
336        manifest_path=manifest_path,
337    )
338    config_dict = resolve_config(
339        config=config,
340        config_file=config_file,
341        config_secret_name=config_secret_name,
342        config_spec_jsonschema=source.config_spec,
343    )
344    source.set_config(config_dict)
345    return source.get_stream_json_schema(stream_name=stream_name)

List all properties for a specific stream in a source connector.

def read_source_stream_records( source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the source connector.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector as a dict or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the source connector config.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], *, stream_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the stream to read records from.')], max_records: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=1000, description='The maximum number of records to read.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> list[dict[str, typing.Any]] | str:
349def read_source_stream_records(
350    source_connector_name: Annotated[
351        str,
352        Field(description="The name of the source connector."),
353    ],
354    config: Annotated[
355        dict | str | None,
356        Field(
357            description="The configuration for the source connector as a dict or JSON string.",
358            default=None,
359        ),
360    ],
361    config_file: Annotated[
362        str | Path | None,
363        Field(
364            description="Path to a YAML or JSON file containing the source connector config.",
365            default=None,
366        ),
367    ],
368    config_secret_name: Annotated[
369        str | None,
370        Field(
371            description="The name of the secret containing the configuration.",
372            default=None,
373        ),
374    ],
375    *,
376    stream_name: Annotated[
377        str,
378        Field(description="The name of the stream to read records from."),
379    ],
380    max_records: Annotated[
381        int,
382        Field(
383            description="The maximum number of records to read.",
384            default=1000,
385        ),
386    ],
387    override_execution_mode: Annotated[
388        Literal["docker", "python", "yaml", "auto"],
389        Field(
390            description="Optionally override the execution method to use for the connector. "
391            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
392            default="auto",
393        ),
394    ],
395    manifest_path: Annotated[
396        str | Path | None,
397        Field(
398            description="Path to a local YAML manifest file for declarative connectors.",
399            default=None,
400        ),
401    ],
402) -> list[dict[str, Any]] | str:
403    """Get records from a source connector."""
404    try:
405        source: Source = _get_mcp_source(
406            connector_name=source_connector_name,
407            override_execution_mode=override_execution_mode,
408            manifest_path=manifest_path,
409        )
410        config_dict = resolve_config(
411            config=config,
412            config_file=config_file,
413            config_secret_name=config_secret_name,
414            config_spec_jsonschema=source.config_spec,
415        )
416        source.set_config(config_dict)
417        # First we get a generator for the records in the specified stream.
418        record_generator = source.get_records(stream_name)
419        # Next we load a limited number of records from the generator into our list.
420        records: list[dict[str, Any]] = list(islice(record_generator, max_records))
421
422        print(f"Retrieved {len(records)} records from stream '{stream_name}'", sys.stderr)
423
424    except Exception as ex:
425        tb_str = traceback.format_exc()
426        # If any error occurs, we print the error message to stderr and return an empty list.
427        return (
428            f"Error reading records from source '{source_connector_name}': {ex!r}, {ex!s}\n{tb_str}"
429        )
430
431    else:
432        return records

Get records from a source connector.

def get_stream_previews( source_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the source connector.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector as a dict or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the source connector config.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], streams: typing.Annotated[list[str] | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description="The streams to get previews for. Use '*' for all streams, or None for selected streams.")], limit: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=10, description='The maximum number of sample records to return per stream.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> dict[str, list[dict[str, typing.Any]] | str]:
436def get_stream_previews(
437    source_name: Annotated[
438        str,
439        Field(description="The name of the source connector."),
440    ],
441    config: Annotated[
442        dict | str | None,
443        Field(
444            description="The configuration for the source connector as a dict or JSON string.",
445            default=None,
446        ),
447    ],
448    config_file: Annotated[
449        str | Path | None,
450        Field(
451            description="Path to a YAML or JSON file containing the source connector config.",
452            default=None,
453        ),
454    ],
455    config_secret_name: Annotated[
456        str | None,
457        Field(
458            description="The name of the secret containing the configuration.",
459            default=None,
460        ),
461    ],
462    streams: Annotated[
463        list[str] | str | None,
464        Field(
465            description=(
466                "The streams to get previews for. "
467                "Use '*' for all streams, or None for selected streams."
468            ),
469            default=None,
470        ),
471    ],
472    limit: Annotated[
473        int,
474        Field(
475            description="The maximum number of sample records to return per stream.",
476            default=10,
477        ),
478    ],
479    override_execution_mode: Annotated[
480        Literal["docker", "python", "yaml", "auto"],
481        Field(
482            description="Optionally override the execution method to use for the connector. "
483            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
484            default="auto",
485        ),
486    ],
487    manifest_path: Annotated[
488        str | Path | None,
489        Field(
490            description="Path to a local YAML manifest file for declarative connectors.",
491            default=None,
492        ),
493    ],
494) -> dict[str, list[dict[str, Any]] | str]:
495    """Get sample records (previews) from streams in a source connector.
496
497    This operation requires a valid configuration, including any required secrets.
498    Returns a dictionary mapping stream names to lists of sample records, or an error
499    message string if an error occurred for that stream.
500    """
501    source: Source = _get_mcp_source(
502        connector_name=source_name,
503        override_execution_mode=override_execution_mode,
504        manifest_path=manifest_path,
505    )
506
507    config_dict = resolve_config(
508        config=config,
509        config_file=config_file,
510        config_secret_name=config_secret_name,
511        config_spec_jsonschema=source.config_spec,
512    )
513    source.set_config(config_dict)
514
515    streams_param: list[str] | Literal["*"] | None = resolve_list_of_strings(streams)
516    if streams_param and len(streams_param) == 1 and streams_param[0] == "*":
517        streams_param = "*"
518
519    try:
520        samples_result = source.get_samples(
521            streams=streams_param,
522            limit=limit,
523            on_error="ignore",
524        )
525    except Exception as ex:
526        tb_str = traceback.format_exc()
527        return {
528            "ERROR": f"Error getting stream previews from source '{source_name}': "
529            f"{ex!r}, {ex!s}\n{tb_str}"
530        }
531
532    result: dict[str, list[dict[str, Any]] | str] = {}
533    for stream_name, dataset in samples_result.items():
534        if dataset is None:
535            result[stream_name] = f"Could not retrieve stream samples for stream '{stream_name}'"
536        else:
537            result[stream_name] = list(dataset)
538
539    return result

Get sample records (previews) from streams in a source connector.

This operation requires a valid configuration, including any required secrets. Returns a dictionary mapping stream names to lists of sample records, or an error message string if an error occurred for that stream.

def sync_source_to_cache( source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the source connector.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector as a dict or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the source connector config.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], streams: typing.Annotated[list[str] | str, FieldInfo(annotation=NoneType, required=False, default='suggested', description='The streams to sync.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> str:
543def sync_source_to_cache(
544    source_connector_name: Annotated[
545        str,
546        Field(description="The name of the source connector."),
547    ],
548    config: Annotated[
549        dict | str | None,
550        Field(
551            description="The configuration for the source connector as a dict or JSON string.",
552            default=None,
553        ),
554    ],
555    config_file: Annotated[
556        str | Path | None,
557        Field(
558            description="Path to a YAML or JSON file containing the source connector config.",
559            default=None,
560        ),
561    ],
562    config_secret_name: Annotated[
563        str | None,
564        Field(
565            description="The name of the secret containing the configuration.",
566            default=None,
567        ),
568    ],
569    streams: Annotated[
570        list[str] | str,
571        Field(
572            description="The streams to sync.",
573            default="suggested",
574        ),
575    ],
576    override_execution_mode: Annotated[
577        Literal["docker", "python", "yaml", "auto"],
578        Field(
579            description="Optionally override the execution method to use for the connector. "
580            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
581            default="auto",
582        ),
583    ],
584    manifest_path: Annotated[
585        str | Path | None,
586        Field(
587            description="Path to a local YAML manifest file for declarative connectors.",
588            default=None,
589        ),
590    ],
591) -> str:
592    """Run a sync from a source connector to the default DuckDB cache."""
593    source: Source = _get_mcp_source(
594        connector_name=source_connector_name,
595        override_execution_mode=override_execution_mode,
596        manifest_path=manifest_path,
597    )
598    config_dict = resolve_config(
599        config=config,
600        config_file=config_file,
601        config_secret_name=config_secret_name,
602        config_spec_jsonschema=source.config_spec,
603    )
604    source.set_config(config_dict)
605    cache = get_default_cache()
606
607    streams = resolve_list_of_strings(streams)
608    if streams and len(streams) == 1 and streams[0] in {"*", "suggested"}:
609        # Float '*' and 'suggested' to the top-level for special processing:
610        streams = streams[0]
611
612    if isinstance(streams, str) and streams == "suggested":
613        streams = "*"  # Default to all streams if 'suggested' is not otherwise specified.
614        try:
615            metadata = get_connector_metadata(
616                source_connector_name,
617            )
618        except Exception:
619            streams = "*"  # Fallback to all streams if suggested streams fail.
620        else:
621            if metadata is not None:
622                streams = metadata.suggested_streams or "*"
623
624    if isinstance(streams, str) and streams != "*":
625        streams = [streams]  # Ensure streams is a list
626
627    source.read(
628        cache=cache,
629        streams=streams,
630    )
631    del cache  # Ensure the cache is closed properly
632
633    summary: str = f"Sync completed for '{source_connector_name}'!\n\n"
634    summary += "Data written to default DuckDB cache\n"
635    return summary

Run a sync from a source connector to the default DuckDB cache.

class CachedDatasetInfo(pydantic.main.BaseModel):
638class CachedDatasetInfo(BaseModel):
639    """Class to hold information about a cached dataset."""
640
641    stream_name: str
642    """The name of the stream in the cache."""
643    table_name: str
644    schema_name: str | None = None

Class to hold information about a cached dataset.

stream_name: str

The name of the stream in the cache.

table_name: str
schema_name: str | None
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
def list_cached_streams() -> list[CachedDatasetInfo]:
648def list_cached_streams() -> list[CachedDatasetInfo]:
649    """List all streams available in the default DuckDB cache."""
650    cache: DuckDBCache = get_default_cache()
651    result = [
652        CachedDatasetInfo(
653            stream_name=stream_name,
654            table_name=(cache.table_prefix or "") + stream_name,
655            schema_name=cache.schema_name,
656        )
657        for stream_name in cache.streams
658    ]
659    del cache  # Ensure the cache is closed properly
660    return result

List all streams available in the default DuckDB cache.

def describe_default_cache() -> dict[str, typing.Any]:
664def describe_default_cache() -> dict[str, Any]:
665    """Describe the currently configured default cache."""
666    cache = get_default_cache()
667    return {
668        "cache_type": type(cache).__name__,
669        "cache_dir": str(cache.cache_dir),
670        "cache_db_path": str(Path(cache.db_path).absolute()),
671        "cached_streams": list(cache.streams.keys()),
672    }

Describe the currently configured default cache.

def run_sql_query( sql_query: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The SQL query to execute.')], max_records: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=1000, description='Maximum number of records to return.')]) -> list[dict[str, typing.Any]]:
711def run_sql_query(
712    sql_query: Annotated[
713        str,
714        Field(description="The SQL query to execute."),
715    ],
716    max_records: Annotated[
717        int,
718        Field(
719            description="Maximum number of records to return.",
720            default=1000,
721        ),
722    ],
723) -> list[dict[str, Any]]:
724    """Run a SQL query against the default cache.
725
726    The dialect of SQL should match the dialect of the default cache.
727    Use `describe_default_cache` to see the cache type.
728
729    For DuckDB-type caches:
730    - Use `SHOW TABLES` to list all tables.
731    - Use `DESCRIBE <table_name>` to get the schema of a specific table
732
733    For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN.
734    """
735    # Check if the query is safe to execute
736    if not _is_safe_sql(sql_query):
737        return [
738            {
739                "ERROR": "Unsafe SQL query detected. Only read-only operations are allowed: "
740                "SELECT, DESCRIBE, SHOW, EXPLAIN",
741                "SQL_QUERY": sql_query,
742            }
743        ]
744
745    cache: DuckDBCache = get_default_cache()
746    try:
747        return cache.run_sql_query(
748            sql_query,
749            max_records=max_records,
750        )
751    except Exception as ex:
752        tb_str = traceback.format_exc()
753        return [
754            {
755                "ERROR": f"Error running SQL query: {ex!r}, {ex!s}",
756                "TRACEBACK": tb_str,
757                "SQL_QUERY": sql_query,
758            }
759        ]
760    finally:
761        del cache  # Ensure the cache is closed properly

Run a SQL query against the default cache.

The dialect of SQL should match the dialect of the default cache. Use describe_default_cache to see the cache type.

For DuckDB-type caches:

  • Use SHOW TABLES to list all tables.
  • Use DESCRIBE <table_name> to get the schema of a specific table

For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN.