airbyte.mcp.local

Local MCP operations.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Local MCP operations."""
  3
  4import sys
  5import traceback
  6from itertools import islice
  7from pathlib import Path
  8from typing import TYPE_CHECKING, Annotated, Any, Literal
  9
 10from fastmcp import FastMCP
 11from fastmcp_extensions import mcp_tool, register_mcp_tools
 12from pydantic import BaseModel, Field
 13
 14from airbyte import get_source
 15from airbyte._util.destination_smoke_tests import (
 16    DestinationSmokeTestResult,
 17    run_destination_smoke_test,
 18)
 19from airbyte._util.meta import is_docker_installed
 20from airbyte.caches.util import get_default_cache
 21from airbyte.destinations.util import get_destination
 22from airbyte.mcp._arg_resolvers import resolve_connector_config, resolve_list_of_strings
 23from airbyte.registry import get_connector_metadata
 24from airbyte.secrets.config import _get_secret_sources
 25from airbyte.secrets.env_vars import DotenvSecretManager
 26from airbyte.secrets.google_gsm import GoogleGSMSecretManager
 27from airbyte.sources.base import Source
 28
 29
 30if TYPE_CHECKING:
 31    from airbyte.caches.duckdb import DuckDBCache
 32
 33
 34_CONFIG_HELP = """
 35You can provide `config` as JSON or a Path to a YAML/JSON file.
 36If a `dict` is provided, it must not contain hardcoded secrets.
 37Instead, secrets should be provided using environment variables,
 38and the config should reference them using the format
 39`secret_reference::ENV_VAR_NAME`.
 40
 41You can also provide a `config_secret_name` to use a specific
 42secret name for the configuration. This is useful if you want to
 43validate a configuration that is stored in a secrets manager.
 44
 45If `config_secret_name` is provided, it should point to a string
 46that contains valid JSON or YAML.
 47
 48If both `config` and `config_secret_name` are provided, the
 49`config` will be loaded first and then the referenced secret config
 50will be layered on top of the non-secret config.
 51
 52For declarative connectors, you can provide a `manifest_path` to
 53specify a local YAML manifest file instead of using the registry
 54version. This is useful for testing custom or locally-developed
 55connector manifests.
 56"""
 57
 58
 59def _get_mcp_source(
 60    connector_name: str,
 61    override_execution_mode: Literal["auto", "docker", "python", "yaml"] = "auto",
 62    *,
 63    install_if_missing: bool = True,
 64    manifest_path: str | Path | None,
 65) -> Source:
 66    """Get the MCP source for a connector."""
 67    if manifest_path:
 68        override_execution_mode = "yaml"
 69    elif override_execution_mode == "auto" and is_docker_installed():
 70        override_execution_mode = "docker"
 71
 72    source: Source
 73    if override_execution_mode == "auto":
 74        # Use defaults with no overrides
 75        source = get_source(
 76            connector_name,
 77            install_if_missing=False,
 78            source_manifest=manifest_path or None,
 79        )
 80    elif override_execution_mode == "python":
 81        source = get_source(
 82            connector_name,
 83            use_python=True,
 84            install_if_missing=False,
 85            source_manifest=manifest_path or None,
 86        )
 87    elif override_execution_mode == "docker":
 88        source = get_source(
 89            connector_name,
 90            docker_image=True,
 91            install_if_missing=False,
 92            source_manifest=manifest_path or None,
 93        )
 94    elif override_execution_mode == "yaml":
 95        source = get_source(
 96            connector_name,
 97            source_manifest=manifest_path or True,
 98            install_if_missing=False,
 99        )
100    else:
101        raise ValueError(
102            f"Unknown execution method: {override_execution_mode}. "
103            "Expected one of: ['auto', 'docker', 'python', 'yaml']."
104        )
105
106    # Ensure installed:
107    if install_if_missing:
108        source.executor.ensure_installation()
109
110    return source
111
112
113@mcp_tool(
114    read_only=True,
115    idempotent=True,
116    extra_help_text=_CONFIG_HELP,
117)
118def validate_connector_config(
119    connector_name: Annotated[
120        str,
121        Field(description="The name of the connector to validate."),
122    ],
123    config: Annotated[
124        dict | str | None,
125        Field(
126            description="The configuration for the connector as a dict object or JSON string.",
127            default=None,
128        ),
129    ],
130    config_file: Annotated[
131        str | Path | None,
132        Field(
133            description="Path to a YAML or JSON file containing the connector configuration.",
134            default=None,
135        ),
136    ],
137    config_secret_name: Annotated[
138        str | None,
139        Field(
140            description="The name of the secret containing the configuration.",
141            default=None,
142        ),
143    ],
144    override_execution_mode: Annotated[
145        Literal["docker", "python", "yaml", "auto"],
146        Field(
147            description="Optionally override the execution method to use for the connector. "
148            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
149            default="auto",
150        ),
151    ],
152    manifest_path: Annotated[
153        str | Path | None,
154        Field(
155            description="Path to a local YAML manifest file for declarative connectors.",
156            default=None,
157        ),
158    ],
159) -> tuple[bool, str]:
160    """Validate a connector configuration.
161
162    Returns a tuple of (is_valid: bool, message: str).
163    """
164    try:
165        source: Source = _get_mcp_source(
166            connector_name,
167            override_execution_mode=override_execution_mode,
168            manifest_path=manifest_path,
169        )
170    except Exception as ex:
171        return False, f"Failed to get connector '{connector_name}': {ex}"
172
173    try:
174        config_dict = resolve_connector_config(
175            config=config,
176            config_file=config_file,
177            config_secret_name=config_secret_name,
178            config_spec_jsonschema=source.config_spec,
179        )
180        source.set_config(config_dict)
181    except Exception as ex:
182        return False, f"Failed to resolve configuration for {connector_name}: {ex}"
183
184    try:
185        source.check()
186    except Exception as ex:
187        return False, f"Configuration for {connector_name} is invalid: {ex}"
188
189    return True, f"Configuration for {connector_name} is valid!"
190
191
192@mcp_tool(
193    read_only=True,
194    idempotent=True,
195)
196def list_connector_config_secrets(
197    connector_name: Annotated[
198        str,
199        Field(description="The name of the connector."),
200    ],
201) -> list[str]:
202    """List all `config_secret_name` options that are known for the given connector.
203
204    This can be used to find out which already-created config secret names are available
205    for a given connector. The return value is a list of secret names, but it will not
206    return the actual secret values.
207    """
208    secrets_names: list[str] = []
209    for secrets_mgr in _get_secret_sources():
210        if isinstance(secrets_mgr, GoogleGSMSecretManager):
211            secrets_names.extend(
212                [
213                    secret_handle.secret_name.split("/")[-1]
214                    for secret_handle in secrets_mgr.fetch_connector_secrets(connector_name)
215                ]
216            )
217
218    return secrets_names
219
220
221@mcp_tool(
222    read_only=True,
223    idempotent=True,
224    extra_help_text=_CONFIG_HELP,
225)
226def list_dotenv_secrets() -> dict[str, list[str]]:
227    """List all environment variable names declared within declared .env files.
228
229    This returns a dictionary mapping the .env file name to a list of environment
230    variable names. The values of the environment variables are not returned.
231    """
232    result: dict[str, list[str]] = {}
233    for secrets_mgr in _get_secret_sources():
234        if isinstance(secrets_mgr, DotenvSecretManager) and secrets_mgr.dotenv_path:
235            result[str(secrets_mgr.dotenv_path.resolve())] = secrets_mgr.list_secrets_names()
236
237    return result
238
239
240@mcp_tool(
241    read_only=True,
242    idempotent=True,
243    extra_help_text=_CONFIG_HELP,
244)
245def list_source_streams(
246    source_connector_name: Annotated[
247        str,
248        Field(description="The name of the source connector."),
249    ],
250    config: Annotated[
251        dict | str | None,
252        Field(
253            description="The configuration for the source connector as a dict or JSON string.",
254            default=None,
255        ),
256    ],
257    config_file: Annotated[
258        str | Path | None,
259        Field(
260            description="Path to a YAML or JSON file containing the source connector config.",
261            default=None,
262        ),
263    ],
264    config_secret_name: Annotated[
265        str | None,
266        Field(
267            description="The name of the secret containing the configuration.",
268            default=None,
269        ),
270    ],
271    override_execution_mode: Annotated[
272        Literal["docker", "python", "yaml", "auto"],
273        Field(
274            description="Optionally override the execution method to use for the connector. "
275            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
276            default="auto",
277        ),
278    ],
279    manifest_path: Annotated[
280        str | Path | None,
281        Field(
282            description="Path to a local YAML manifest file for declarative connectors.",
283            default=None,
284        ),
285    ],
286) -> list[str]:
287    """List all streams available in a source connector.
288
289    This operation (generally) requires a valid configuration, including any required secrets.
290    """
291    source: Source = _get_mcp_source(
292        connector_name=source_connector_name,
293        override_execution_mode=override_execution_mode,
294        manifest_path=manifest_path,
295    )
296    config_dict = resolve_connector_config(
297        config=config,
298        config_file=config_file,
299        config_secret_name=config_secret_name,
300        config_spec_jsonschema=source.config_spec,
301    )
302    source.set_config(config_dict)
303    return source.get_available_streams()
304
305
306@mcp_tool(
307    read_only=True,
308    idempotent=True,
309    extra_help_text=_CONFIG_HELP,
310)
311def get_source_stream_json_schema(
312    source_connector_name: Annotated[
313        str,
314        Field(description="The name of the source connector."),
315    ],
316    stream_name: Annotated[
317        str,
318        Field(description="The name of the stream."),
319    ],
320    config: Annotated[
321        dict | str | None,
322        Field(
323            description="The configuration for the source connector as a dict or JSON string.",
324            default=None,
325        ),
326    ],
327    config_file: Annotated[
328        str | Path | None,
329        Field(
330            description="Path to a YAML or JSON file containing the source connector config.",
331            default=None,
332        ),
333    ],
334    config_secret_name: Annotated[
335        str | None,
336        Field(
337            description="The name of the secret containing the configuration.",
338            default=None,
339        ),
340    ],
341    override_execution_mode: Annotated[
342        Literal["docker", "python", "yaml", "auto"],
343        Field(
344            description="Optionally override the execution method to use for the connector. "
345            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
346            default="auto",
347        ),
348    ],
349    manifest_path: Annotated[
350        str | Path | None,
351        Field(
352            description="Path to a local YAML manifest file for declarative connectors.",
353            default=None,
354        ),
355    ],
356) -> dict[str, Any]:
357    """List all properties for a specific stream in a source connector."""
358    source: Source = _get_mcp_source(
359        connector_name=source_connector_name,
360        override_execution_mode=override_execution_mode,
361        manifest_path=manifest_path,
362    )
363    config_dict = resolve_connector_config(
364        config=config,
365        config_file=config_file,
366        config_secret_name=config_secret_name,
367        config_spec_jsonschema=source.config_spec,
368    )
369    source.set_config(config_dict)
370    return source.get_stream_json_schema(stream_name=stream_name)
371
372
373@mcp_tool(
374    read_only=True,
375    extra_help_text=_CONFIG_HELP,
376)
377def read_source_stream_records(
378    source_connector_name: Annotated[
379        str,
380        Field(description="The name of the source connector."),
381    ],
382    config: Annotated[
383        dict | str | None,
384        Field(
385            description="The configuration for the source connector as a dict or JSON string.",
386            default=None,
387        ),
388    ],
389    config_file: Annotated[
390        str | Path | None,
391        Field(
392            description="Path to a YAML or JSON file containing the source connector config.",
393            default=None,
394        ),
395    ],
396    config_secret_name: Annotated[
397        str | None,
398        Field(
399            description="The name of the secret containing the configuration.",
400            default=None,
401        ),
402    ],
403    *,
404    stream_name: Annotated[
405        str,
406        Field(description="The name of the stream to read records from."),
407    ],
408    max_records: Annotated[
409        int,
410        Field(
411            description="The maximum number of records to read.",
412            default=1000,
413        ),
414    ],
415    override_execution_mode: Annotated[
416        Literal["docker", "python", "yaml", "auto"],
417        Field(
418            description="Optionally override the execution method to use for the connector. "
419            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
420            default="auto",
421        ),
422    ],
423    manifest_path: Annotated[
424        str | Path | None,
425        Field(
426            description="Path to a local YAML manifest file for declarative connectors.",
427            default=None,
428        ),
429    ],
430) -> list[dict[str, Any]] | str:
431    """Get records from a source connector."""
432    try:
433        source: Source = _get_mcp_source(
434            connector_name=source_connector_name,
435            override_execution_mode=override_execution_mode,
436            manifest_path=manifest_path,
437        )
438        config_dict = resolve_connector_config(
439            config=config,
440            config_file=config_file,
441            config_secret_name=config_secret_name,
442            config_spec_jsonschema=source.config_spec,
443        )
444        source.set_config(config_dict)
445        # First we get a generator for the records in the specified stream.
446        record_generator = source.get_records(stream_name)
447        # Next we load a limited number of records from the generator into our list.
448        records: list[dict[str, Any]] = list(islice(record_generator, max_records))
449
450        print(f"Retrieved {len(records)} records from stream '{stream_name}'", sys.stderr)
451
452    except Exception as ex:
453        tb_str = traceback.format_exc()
454        # If any error occurs, we print the error message to stderr and return an empty list.
455        return (
456            f"Error reading records from source '{source_connector_name}': {ex!r}, {ex!s}\n{tb_str}"
457        )
458
459    else:
460        return records
461
462
463@mcp_tool(
464    read_only=True,
465    extra_help_text=_CONFIG_HELP,
466)
467def get_stream_previews(
468    source_name: Annotated[
469        str,
470        Field(description="The name of the source connector."),
471    ],
472    config: Annotated[
473        dict | str | None,
474        Field(
475            description="The configuration for the source connector as a dict or JSON string.",
476            default=None,
477        ),
478    ],
479    config_file: Annotated[
480        str | Path | None,
481        Field(
482            description="Path to a YAML or JSON file containing the source connector config.",
483            default=None,
484        ),
485    ],
486    config_secret_name: Annotated[
487        str | None,
488        Field(
489            description="The name of the secret containing the configuration.",
490            default=None,
491        ),
492    ],
493    streams: Annotated[
494        list[str] | str | None,
495        Field(
496            description=(
497                "The streams to get previews for. "
498                "Use '*' for all streams, or None for selected streams."
499            ),
500            default=None,
501        ),
502    ],
503    limit: Annotated[
504        int,
505        Field(
506            description="The maximum number of sample records to return per stream.",
507            default=10,
508        ),
509    ],
510    override_execution_mode: Annotated[
511        Literal["docker", "python", "yaml", "auto"],
512        Field(
513            description="Optionally override the execution method to use for the connector. "
514            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
515            default="auto",
516        ),
517    ],
518    manifest_path: Annotated[
519        str | Path | None,
520        Field(
521            description="Path to a local YAML manifest file for declarative connectors.",
522            default=None,
523        ),
524    ],
525) -> dict[str, list[dict[str, Any]] | str]:
526    """Get sample records (previews) from streams in a source connector.
527
528    This operation requires a valid configuration, including any required secrets.
529    Returns a dictionary mapping stream names to lists of sample records, or an error
530    message string if an error occurred for that stream.
531    """
532    source: Source = _get_mcp_source(
533        connector_name=source_name,
534        override_execution_mode=override_execution_mode,
535        manifest_path=manifest_path,
536    )
537
538    config_dict = resolve_connector_config(
539        config=config,
540        config_file=config_file,
541        config_secret_name=config_secret_name,
542        config_spec_jsonschema=source.config_spec,
543    )
544    source.set_config(config_dict)
545
546    streams_param: list[str] | Literal["*"] | None = resolve_list_of_strings(
547        streams
548    )  # pyrefly: ignore[no-matching-overload]
549    if streams_param and len(streams_param) == 1 and streams_param[0] == "*":
550        streams_param = "*"
551
552    try:
553        samples_result = source.get_samples(
554            streams=streams_param,
555            limit=limit,
556            on_error="ignore",
557        )
558    except Exception as ex:
559        tb_str = traceback.format_exc()
560        return {
561            "ERROR": f"Error getting stream previews from source '{source_name}': "
562            f"{ex!r}, {ex!s}\n{tb_str}"
563        }
564
565    result: dict[str, list[dict[str, Any]] | str] = {}
566    for stream_name, dataset in samples_result.items():
567        if dataset is None:
568            result[stream_name] = f"Could not retrieve stream samples for stream '{stream_name}'"
569        else:
570            result[stream_name] = list(dataset)
571
572    return result
573
574
575@mcp_tool(
576    destructive=False,
577    extra_help_text=_CONFIG_HELP,
578)
579def sync_source_to_cache(
580    source_connector_name: Annotated[
581        str,
582        Field(description="The name of the source connector."),
583    ],
584    config: Annotated[
585        dict | str | None,
586        Field(
587            description="The configuration for the source connector as a dict or JSON string.",
588            default=None,
589        ),
590    ],
591    config_file: Annotated[
592        str | Path | None,
593        Field(
594            description="Path to a YAML or JSON file containing the source connector config.",
595            default=None,
596        ),
597    ],
598    config_secret_name: Annotated[
599        str | None,
600        Field(
601            description="The name of the secret containing the configuration.",
602            default=None,
603        ),
604    ],
605    streams: Annotated[
606        list[str] | str,
607        Field(
608            description="The streams to sync.",
609            default="suggested",
610        ),
611    ],
612    override_execution_mode: Annotated[
613        Literal["docker", "python", "yaml", "auto"],
614        Field(
615            description="Optionally override the execution method to use for the connector. "
616            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
617            default="auto",
618        ),
619    ],
620    manifest_path: Annotated[
621        str | Path | None,
622        Field(
623            description="Path to a local YAML manifest file for declarative connectors.",
624            default=None,
625        ),
626    ],
627) -> str:
628    """Run a sync from a source connector to the default DuckDB cache."""
629    source: Source = _get_mcp_source(
630        connector_name=source_connector_name,
631        override_execution_mode=override_execution_mode,
632        manifest_path=manifest_path,
633    )
634    config_dict = resolve_connector_config(
635        config=config,
636        config_file=config_file,
637        config_secret_name=config_secret_name,
638        config_spec_jsonschema=source.config_spec,
639    )
640    source.set_config(config_dict)
641    cache = get_default_cache()
642
643    streams = resolve_list_of_strings(streams)
644    if streams and len(streams) == 1 and streams[0] in {"*", "suggested"}:
645        # Float '*' and 'suggested' to the top-level for special processing:
646        streams = streams[0]
647
648    if isinstance(streams, str) and streams == "suggested":
649        streams = "*"  # Default to all streams if 'suggested' is not otherwise specified.
650        try:
651            metadata = get_connector_metadata(
652                source_connector_name,
653            )
654        except Exception:
655            streams = "*"  # Fallback to all streams if suggested streams fail.
656        else:
657            if metadata is not None:
658                streams = metadata.suggested_streams or "*"
659
660    if isinstance(streams, str) and streams != "*":
661        streams = [streams]  # Ensure streams is a list
662
663    source.read(
664        cache=cache,
665        streams=streams,
666    )
667    del cache  # Ensure the cache is closed properly
668
669    summary: str = f"Sync completed for '{source_connector_name}'!\n\n"
670    summary += "Data written to default DuckDB cache\n"
671    return summary
672
673
674class CachedDatasetInfo(BaseModel):
675    """Class to hold information about a cached dataset."""
676
677    stream_name: str
678    """The name of the stream in the cache."""
679    table_name: str
680    schema_name: str | None = None
681
682
683@mcp_tool(
684    read_only=True,
685    idempotent=True,
686    extra_help_text=_CONFIG_HELP,
687)
688def list_cached_streams() -> list[CachedDatasetInfo]:
689    """List all streams available in the default DuckDB cache."""
690    cache: DuckDBCache = get_default_cache()
691    result = [
692        CachedDatasetInfo(
693            stream_name=stream_name,
694            table_name=(cache.table_prefix or "") + stream_name,
695            schema_name=cache.schema_name,
696        )
697        for stream_name in cache.streams
698    ]
699    del cache  # Ensure the cache is closed properly
700    return result
701
702
703@mcp_tool(
704    read_only=True,
705    idempotent=True,
706    extra_help_text=_CONFIG_HELP,
707)
708def describe_default_cache() -> dict[str, Any]:
709    """Describe the currently configured default cache."""
710    cache = get_default_cache()
711    return {
712        "cache_type": type(cache).__name__,
713        "cache_dir": str(cache.cache_dir),
714        "cache_db_path": str(Path(cache.db_path).absolute()),
715        "cached_streams": list(cache.streams.keys()),
716    }
717
718
719def _is_safe_sql(sql_query: str) -> bool:
720    """Check if a SQL query is safe to execute.
721
722    For security reasons, we only allow read-only operations like SELECT, DESCRIBE, and SHOW.
723    Multi-statement queries (containing semicolons) are also disallowed for security.
724
725    Note: SQLAlchemy will also validate downstream, but this is a first-pass check.
726
727    Args:
728        sql_query: The SQL query to check
729
730    Returns:
731        True if the query is safe to execute, False otherwise
732    """
733    # Remove leading/trailing whitespace and convert to uppercase for checking
734    normalized_query = sql_query.strip().upper()
735
736    # Disallow multi-statement queries (containing semicolons)
737    # Note: We check the original query to catch semicolons anywhere, including in comments
738    if ";" in sql_query:
739        return False
740
741    # List of allowed SQL statement prefixes (read-only operations)
742    allowed_prefixes = (
743        "SELECT",
744        "DESCRIBE",
745        "DESC",  # Short form of DESCRIBE
746        "SHOW",
747        "EXPLAIN",  # Also safe - shows query execution plan
748    )
749
750    # Check if the query starts with any allowed prefix
751    return any(normalized_query.startswith(prefix) for prefix in allowed_prefixes)
752
753
754@mcp_tool(
755    read_only=True,
756    idempotent=True,
757    extra_help_text=_CONFIG_HELP,
758)
759def run_sql_query(
760    sql_query: Annotated[
761        str,
762        Field(description="The SQL query to execute."),
763    ],
764    max_records: Annotated[
765        int,
766        Field(
767            description="Maximum number of records to return.",
768            default=1000,
769        ),
770    ],
771) -> list[dict[str, Any]]:
772    """Run a SQL query against the default cache.
773
774    The dialect of SQL should match the dialect of the default cache.
775    Use `describe_default_cache` to see the cache type.
776
777    For DuckDB-type caches:
778    - Use `SHOW TABLES` to list all tables.
779    - Use `DESCRIBE <table_name>` to get the schema of a specific table
780
781    For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN.
782    """
783    # Check if the query is safe to execute
784    if not _is_safe_sql(sql_query):
785        return [
786            {
787                "ERROR": "Unsafe SQL query detected. Only read-only operations are allowed: "
788                "SELECT, DESCRIBE, SHOW, EXPLAIN",
789                "SQL_QUERY": sql_query,
790            }
791        ]
792
793    cache: DuckDBCache = get_default_cache()
794    try:
795        return cache.run_sql_query(
796            sql_query,
797            max_records=max_records,
798        )
799    except Exception as ex:
800        tb_str = traceback.format_exc()
801        return [
802            {
803                "ERROR": f"Error running SQL query: {ex!r}, {ex!s}",
804                "TRACEBACK": tb_str,
805                "SQL_QUERY": sql_query,
806            }
807        ]
808    finally:
809        del cache  # Ensure the cache is closed properly
810
811
812@mcp_tool(
813    destructive=True,
814)
815def destination_smoke_test(  # noqa: PLR0913, PLR0917
816    destination_connector_name: Annotated[
817        str,
818        Field(
819            description=(
820                "The name of the destination connector to test "
821                "(e.g. 'destination-snowflake', 'destination-motherduck')."
822            ),
823        ),
824    ],
825    config: Annotated[
826        dict | str | None,
827        Field(
828            description=(
829                "The destination configuration as a dict object or JSON string. "
830                "Must not contain hardcoded secrets; use secret_reference::ENV_VAR_NAME instead."
831            ),
832            default=None,
833        ),
834    ],
835    config_file: Annotated[
836        str | Path | None,
837        Field(
838            description="Path to a YAML or JSON file containing the destination configuration.",
839            default=None,
840        ),
841    ],
842    config_secret_name: Annotated[
843        str | None,
844        Field(
845            description="The name of the secret containing the destination configuration.",
846            default=None,
847        ),
848    ],
849    scenarios: Annotated[
850        list[str] | str,
851        Field(
852            description=(
853                "Which scenarios to run. Use 'fast' (default) for all fast predefined "
854                "scenarios (excludes large_batch_stream), 'all' for every predefined "
855                "scenario including large batch, or provide a list of scenario names "
856                "or a comma-separated string."
857            ),
858            default="fast",
859        ),
860    ],
861    custom_scenarios: Annotated[
862        list[dict[str, Any]] | None,
863        Field(
864            description=(
865                "Additional custom test scenarios to inject. Each scenario should define "
866                "'name', 'json_schema', and optionally 'records' and 'primary_key'. "
867                "These are unioned with the predefined scenarios."
868            ),
869            default=None,
870        ),
871    ],
872    docker_image: Annotated[
873        str | None,
874        Field(
875            description=(
876                "Optional Docker image override for the destination connector "
877                "(e.g. 'airbyte/destination-snowflake:3.14.0')."
878            ),
879            default=None,
880        ),
881    ],
882    namespace_suffix: Annotated[
883        str | None,
884        Field(
885            description=(
886                "Optional suffix appended to the auto-generated namespace. "
887                "Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). "
888                "Use this to distinguish concurrent runs."
889            ),
890            default=None,
891        ),
892    ],
893    reuse_namespace: Annotated[
894        str | None,
895        Field(
896            description=(
897                "Exact namespace to reuse from a previous run. "
898                "When set, no new namespace is generated. "
899                "Useful for running a second test against an already-populated namespace."
900            ),
901            default=None,
902        ),
903    ],
904) -> DestinationSmokeTestResult:
905    """Run smoke tests against a destination connector.
906
907    Sends synthetic test data from the smoke test source to the specified
908    destination and reports success or failure. The smoke test source generates
909    data across predefined scenarios covering common destination failure patterns:
910    type variations, null handling, naming edge cases, schema variations, and
911    batch sizes.
912
913    This tool does NOT read back data from the destination or compare results.
914    It only verifies that the destination accepts the data without errors.
915    """
916    # Resolve destination config
917    config_dict = resolve_connector_config(
918        config=config,
919        config_file=config_file,
920        config_secret_name=config_secret_name,
921    )
922
923    # Set up destination
924    destination_kwargs: dict[str, Any] = {
925        "name": destination_connector_name,
926        "config": config_dict,
927    }
928    if docker_image:
929        destination_kwargs["docker_image"] = docker_image
930    elif is_docker_installed():
931        destination_kwargs["docker_image"] = True
932
933    destination_obj = get_destination(**destination_kwargs)
934
935    # Resolve scenarios for the shared helper
936    resolved_scenarios: str | list[str]
937    if isinstance(scenarios, str):
938        resolved_scenarios = scenarios
939    else:
940        resolved_scenarios = resolve_list_of_strings(scenarios) or "fast"
941
942    return run_destination_smoke_test(
943        destination=destination_obj,
944        scenarios=resolved_scenarios,
945        namespace_suffix=namespace_suffix,
946        reuse_namespace=reuse_namespace,
947        custom_scenarios=custom_scenarios,
948    )
949
950
951def register_local_tools(app: FastMCP) -> None:
952    """Register local tools with the FastMCP app.
953
954    Args:
955        app: FastMCP application instance
956    """
957    register_mcp_tools(app, mcp_module=__name__)
@mcp_tool(read_only=True, idempotent=True, extra_help_text=_CONFIG_HELP)
def validate_connector_config( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the connector to validate.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the connector as a dict object or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the connector configuration.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> tuple[bool, str]:
114@mcp_tool(
115    read_only=True,
116    idempotent=True,
117    extra_help_text=_CONFIG_HELP,
118)
119def validate_connector_config(
120    connector_name: Annotated[
121        str,
122        Field(description="The name of the connector to validate."),
123    ],
124    config: Annotated[
125        dict | str | None,
126        Field(
127            description="The configuration for the connector as a dict object or JSON string.",
128            default=None,
129        ),
130    ],
131    config_file: Annotated[
132        str | Path | None,
133        Field(
134            description="Path to a YAML or JSON file containing the connector configuration.",
135            default=None,
136        ),
137    ],
138    config_secret_name: Annotated[
139        str | None,
140        Field(
141            description="The name of the secret containing the configuration.",
142            default=None,
143        ),
144    ],
145    override_execution_mode: Annotated[
146        Literal["docker", "python", "yaml", "auto"],
147        Field(
148            description="Optionally override the execution method to use for the connector. "
149            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
150            default="auto",
151        ),
152    ],
153    manifest_path: Annotated[
154        str | Path | None,
155        Field(
156            description="Path to a local YAML manifest file for declarative connectors.",
157            default=None,
158        ),
159    ],
160) -> tuple[bool, str]:
161    """Validate a connector configuration.
162
163    Returns a tuple of (is_valid: bool, message: str).
164    """
165    try:
166        source: Source = _get_mcp_source(
167            connector_name,
168            override_execution_mode=override_execution_mode,
169            manifest_path=manifest_path,
170        )
171    except Exception as ex:
172        return False, f"Failed to get connector '{connector_name}': {ex}"
173
174    try:
175        config_dict = resolve_connector_config(
176            config=config,
177            config_file=config_file,
178            config_secret_name=config_secret_name,
179            config_spec_jsonschema=source.config_spec,
180        )
181        source.set_config(config_dict)
182    except Exception as ex:
183        return False, f"Failed to resolve configuration for {connector_name}: {ex}"
184
185    try:
186        source.check()
187    except Exception as ex:
188        return False, f"Configuration for {connector_name} is invalid: {ex}"
189
190    return True, f"Configuration for {connector_name} is valid!"

Validate a connector configuration.

Returns a tuple of (is_valid: bool, message: str).

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

@mcp_tool(read_only=True, idempotent=True)
def list_connector_config_secrets( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the connector.')]) -> list[str]:
193@mcp_tool(
194    read_only=True,
195    idempotent=True,
196)
197def list_connector_config_secrets(
198    connector_name: Annotated[
199        str,
200        Field(description="The name of the connector."),
201    ],
202) -> list[str]:
203    """List all `config_secret_name` options that are known for the given connector.
204
205    This can be used to find out which already-created config secret names are available
206    for a given connector. The return value is a list of secret names, but it will not
207    return the actual secret values.
208    """
209    secrets_names: list[str] = []
210    for secrets_mgr in _get_secret_sources():
211        if isinstance(secrets_mgr, GoogleGSMSecretManager):
212            secrets_names.extend(
213                [
214                    secret_handle.secret_name.split("/")[-1]
215                    for secret_handle in secrets_mgr.fetch_connector_secrets(connector_name)
216                ]
217            )
218
219    return secrets_names

List all config_secret_name options that are known for the given connector.

This can be used to find out which already-created config secret names are available for a given connector. The return value is a list of secret names, but it will not return the actual secret values.

@mcp_tool(read_only=True, idempotent=True, extra_help_text=_CONFIG_HELP)
def list_dotenv_secrets() -> dict[str, list[str]]:
222@mcp_tool(
223    read_only=True,
224    idempotent=True,
225    extra_help_text=_CONFIG_HELP,
226)
227def list_dotenv_secrets() -> dict[str, list[str]]:
228    """List all environment variable names declared within declared .env files.
229
230    This returns a dictionary mapping the .env file name to a list of environment
231    variable names. The values of the environment variables are not returned.
232    """
233    result: dict[str, list[str]] = {}
234    for secrets_mgr in _get_secret_sources():
235        if isinstance(secrets_mgr, DotenvSecretManager) and secrets_mgr.dotenv_path:
236            result[str(secrets_mgr.dotenv_path.resolve())] = secrets_mgr.list_secrets_names()
237
238    return result

List all environment variable names declared within declared .env files.

This returns a dictionary mapping the .env file name to a list of environment
variable names. The values of the environment variables are not returned.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

@mcp_tool(read_only=True, idempotent=True, extra_help_text=_CONFIG_HELP)
def list_source_streams( source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the source connector.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector as a dict or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the source connector config.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> list[str]:
241@mcp_tool(
242    read_only=True,
243    idempotent=True,
244    extra_help_text=_CONFIG_HELP,
245)
246def list_source_streams(
247    source_connector_name: Annotated[
248        str,
249        Field(description="The name of the source connector."),
250    ],
251    config: Annotated[
252        dict | str | None,
253        Field(
254            description="The configuration for the source connector as a dict or JSON string.",
255            default=None,
256        ),
257    ],
258    config_file: Annotated[
259        str | Path | None,
260        Field(
261            description="Path to a YAML or JSON file containing the source connector config.",
262            default=None,
263        ),
264    ],
265    config_secret_name: Annotated[
266        str | None,
267        Field(
268            description="The name of the secret containing the configuration.",
269            default=None,
270        ),
271    ],
272    override_execution_mode: Annotated[
273        Literal["docker", "python", "yaml", "auto"],
274        Field(
275            description="Optionally override the execution method to use for the connector. "
276            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
277            default="auto",
278        ),
279    ],
280    manifest_path: Annotated[
281        str | Path | None,
282        Field(
283            description="Path to a local YAML manifest file for declarative connectors.",
284            default=None,
285        ),
286    ],
287) -> list[str]:
288    """List all streams available in a source connector.
289
290    This operation (generally) requires a valid configuration, including any required secrets.
291    """
292    source: Source = _get_mcp_source(
293        connector_name=source_connector_name,
294        override_execution_mode=override_execution_mode,
295        manifest_path=manifest_path,
296    )
297    config_dict = resolve_connector_config(
298        config=config,
299        config_file=config_file,
300        config_secret_name=config_secret_name,
301        config_spec_jsonschema=source.config_spec,
302    )
303    source.set_config(config_dict)
304    return source.get_available_streams()

List all streams available in a source connector.

This operation (generally) requires a valid configuration, including any required secrets.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

@mcp_tool(read_only=True, idempotent=True, extra_help_text=_CONFIG_HELP)
def get_source_stream_json_schema( source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the source connector.')], stream_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the stream.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector as a dict or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the source connector config.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> dict[str, typing.Any]:
307@mcp_tool(
308    read_only=True,
309    idempotent=True,
310    extra_help_text=_CONFIG_HELP,
311)
312def get_source_stream_json_schema(
313    source_connector_name: Annotated[
314        str,
315        Field(description="The name of the source connector."),
316    ],
317    stream_name: Annotated[
318        str,
319        Field(description="The name of the stream."),
320    ],
321    config: Annotated[
322        dict | str | None,
323        Field(
324            description="The configuration for the source connector as a dict or JSON string.",
325            default=None,
326        ),
327    ],
328    config_file: Annotated[
329        str | Path | None,
330        Field(
331            description="Path to a YAML or JSON file containing the source connector config.",
332            default=None,
333        ),
334    ],
335    config_secret_name: Annotated[
336        str | None,
337        Field(
338            description="The name of the secret containing the configuration.",
339            default=None,
340        ),
341    ],
342    override_execution_mode: Annotated[
343        Literal["docker", "python", "yaml", "auto"],
344        Field(
345            description="Optionally override the execution method to use for the connector. "
346            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
347            default="auto",
348        ),
349    ],
350    manifest_path: Annotated[
351        str | Path | None,
352        Field(
353            description="Path to a local YAML manifest file for declarative connectors.",
354            default=None,
355        ),
356    ],
357) -> dict[str, Any]:
358    """List all properties for a specific stream in a source connector."""
359    source: Source = _get_mcp_source(
360        connector_name=source_connector_name,
361        override_execution_mode=override_execution_mode,
362        manifest_path=manifest_path,
363    )
364    config_dict = resolve_connector_config(
365        config=config,
366        config_file=config_file,
367        config_secret_name=config_secret_name,
368        config_spec_jsonschema=source.config_spec,
369    )
370    source.set_config(config_dict)
371    return source.get_stream_json_schema(stream_name=stream_name)

List all properties for a specific stream in a source connector.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

@mcp_tool(read_only=True, extra_help_text=_CONFIG_HELP)
def read_source_stream_records( source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the source connector.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector as a dict or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the source connector config.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], *, stream_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the stream to read records from.')], max_records: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=1000, description='The maximum number of records to read.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> list[dict[str, typing.Any]] | str:
374@mcp_tool(
375    read_only=True,
376    extra_help_text=_CONFIG_HELP,
377)
378def read_source_stream_records(
379    source_connector_name: Annotated[
380        str,
381        Field(description="The name of the source connector."),
382    ],
383    config: Annotated[
384        dict | str | None,
385        Field(
386            description="The configuration for the source connector as a dict or JSON string.",
387            default=None,
388        ),
389    ],
390    config_file: Annotated[
391        str | Path | None,
392        Field(
393            description="Path to a YAML or JSON file containing the source connector config.",
394            default=None,
395        ),
396    ],
397    config_secret_name: Annotated[
398        str | None,
399        Field(
400            description="The name of the secret containing the configuration.",
401            default=None,
402        ),
403    ],
404    *,
405    stream_name: Annotated[
406        str,
407        Field(description="The name of the stream to read records from."),
408    ],
409    max_records: Annotated[
410        int,
411        Field(
412            description="The maximum number of records to read.",
413            default=1000,
414        ),
415    ],
416    override_execution_mode: Annotated[
417        Literal["docker", "python", "yaml", "auto"],
418        Field(
419            description="Optionally override the execution method to use for the connector. "
420            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
421            default="auto",
422        ),
423    ],
424    manifest_path: Annotated[
425        str | Path | None,
426        Field(
427            description="Path to a local YAML manifest file for declarative connectors.",
428            default=None,
429        ),
430    ],
431) -> list[dict[str, Any]] | str:
432    """Get records from a source connector."""
433    try:
434        source: Source = _get_mcp_source(
435            connector_name=source_connector_name,
436            override_execution_mode=override_execution_mode,
437            manifest_path=manifest_path,
438        )
439        config_dict = resolve_connector_config(
440            config=config,
441            config_file=config_file,
442            config_secret_name=config_secret_name,
443            config_spec_jsonschema=source.config_spec,
444        )
445        source.set_config(config_dict)
446        # First we get a generator for the records in the specified stream.
447        record_generator = source.get_records(stream_name)
448        # Next we load a limited number of records from the generator into our list.
449        records: list[dict[str, Any]] = list(islice(record_generator, max_records))
450
451        print(f"Retrieved {len(records)} records from stream '{stream_name}'", sys.stderr)
452
453    except Exception as ex:
454        tb_str = traceback.format_exc()
455        # If any error occurs, we print the error message to stderr and return an empty list.
456        return (
457            f"Error reading records from source '{source_connector_name}': {ex!r}, {ex!s}\n{tb_str}"
458        )
459
460    else:
461        return records

Get records from a source connector.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

@mcp_tool(read_only=True, extra_help_text=_CONFIG_HELP)
def get_stream_previews( source_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the source connector.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector as a dict or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the source connector config.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], streams: typing.Annotated[list[str] | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description="The streams to get previews for. Use '*' for all streams, or None for selected streams.")], limit: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=10, description='The maximum number of sample records to return per stream.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> dict[str, list[dict[str, typing.Any]] | str]:
464@mcp_tool(
465    read_only=True,
466    extra_help_text=_CONFIG_HELP,
467)
468def get_stream_previews(
469    source_name: Annotated[
470        str,
471        Field(description="The name of the source connector."),
472    ],
473    config: Annotated[
474        dict | str | None,
475        Field(
476            description="The configuration for the source connector as a dict or JSON string.",
477            default=None,
478        ),
479    ],
480    config_file: Annotated[
481        str | Path | None,
482        Field(
483            description="Path to a YAML or JSON file containing the source connector config.",
484            default=None,
485        ),
486    ],
487    config_secret_name: Annotated[
488        str | None,
489        Field(
490            description="The name of the secret containing the configuration.",
491            default=None,
492        ),
493    ],
494    streams: Annotated[
495        list[str] | str | None,
496        Field(
497            description=(
498                "The streams to get previews for. "
499                "Use '*' for all streams, or None for selected streams."
500            ),
501            default=None,
502        ),
503    ],
504    limit: Annotated[
505        int,
506        Field(
507            description="The maximum number of sample records to return per stream.",
508            default=10,
509        ),
510    ],
511    override_execution_mode: Annotated[
512        Literal["docker", "python", "yaml", "auto"],
513        Field(
514            description="Optionally override the execution method to use for the connector. "
515            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
516            default="auto",
517        ),
518    ],
519    manifest_path: Annotated[
520        str | Path | None,
521        Field(
522            description="Path to a local YAML manifest file for declarative connectors.",
523            default=None,
524        ),
525    ],
526) -> dict[str, list[dict[str, Any]] | str]:
527    """Get sample records (previews) from streams in a source connector.
528
529    This operation requires a valid configuration, including any required secrets.
530    Returns a dictionary mapping stream names to lists of sample records, or an error
531    message string if an error occurred for that stream.
532    """
533    source: Source = _get_mcp_source(
534        connector_name=source_name,
535        override_execution_mode=override_execution_mode,
536        manifest_path=manifest_path,
537    )
538
539    config_dict = resolve_connector_config(
540        config=config,
541        config_file=config_file,
542        config_secret_name=config_secret_name,
543        config_spec_jsonschema=source.config_spec,
544    )
545    source.set_config(config_dict)
546
547    streams_param: list[str] | Literal["*"] | None = resolve_list_of_strings(
548        streams
549    )  # pyrefly: ignore[no-matching-overload]
550    if streams_param and len(streams_param) == 1 and streams_param[0] == "*":
551        streams_param = "*"
552
553    try:
554        samples_result = source.get_samples(
555            streams=streams_param,
556            limit=limit,
557            on_error="ignore",
558        )
559    except Exception as ex:
560        tb_str = traceback.format_exc()
561        return {
562            "ERROR": f"Error getting stream previews from source '{source_name}': "
563            f"{ex!r}, {ex!s}\n{tb_str}"
564        }
565
566    result: dict[str, list[dict[str, Any]] | str] = {}
567    for stream_name, dataset in samples_result.items():
568        if dataset is None:
569            result[stream_name] = f"Could not retrieve stream samples for stream '{stream_name}'"
570        else:
571            result[stream_name] = list(dataset)
572
573    return result

Get sample records (previews) from streams in a source connector.

This operation requires a valid configuration, including any required secrets.
Returns a dictionary mapping stream names to lists of sample records, or an error
message string if an error occurred for that stream.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

@mcp_tool(destructive=False, extra_help_text=_CONFIG_HELP)
def sync_source_to_cache( source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the source connector.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector as a dict or JSON string.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the source connector config.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], streams: typing.Annotated[list[str] | str, FieldInfo(annotation=NoneType, required=False, default='suggested', description='The streams to sync.')], override_execution_mode: Annotated[Literal['docker', 'python', 'yaml', 'auto'], FieldInfo(annotation=NoneType, required=False, default='auto', description='Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).')], manifest_path: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a local YAML manifest file for declarative connectors.')]) -> str:
576@mcp_tool(
577    destructive=False,
578    extra_help_text=_CONFIG_HELP,
579)
580def sync_source_to_cache(
581    source_connector_name: Annotated[
582        str,
583        Field(description="The name of the source connector."),
584    ],
585    config: Annotated[
586        dict | str | None,
587        Field(
588            description="The configuration for the source connector as a dict or JSON string.",
589            default=None,
590        ),
591    ],
592    config_file: Annotated[
593        str | Path | None,
594        Field(
595            description="Path to a YAML or JSON file containing the source connector config.",
596            default=None,
597        ),
598    ],
599    config_secret_name: Annotated[
600        str | None,
601        Field(
602            description="The name of the secret containing the configuration.",
603            default=None,
604        ),
605    ],
606    streams: Annotated[
607        list[str] | str,
608        Field(
609            description="The streams to sync.",
610            default="suggested",
611        ),
612    ],
613    override_execution_mode: Annotated[
614        Literal["docker", "python", "yaml", "auto"],
615        Field(
616            description="Optionally override the execution method to use for the connector. "
617            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
618            default="auto",
619        ),
620    ],
621    manifest_path: Annotated[
622        str | Path | None,
623        Field(
624            description="Path to a local YAML manifest file for declarative connectors.",
625            default=None,
626        ),
627    ],
628) -> str:
629    """Run a sync from a source connector to the default DuckDB cache."""
630    source: Source = _get_mcp_source(
631        connector_name=source_connector_name,
632        override_execution_mode=override_execution_mode,
633        manifest_path=manifest_path,
634    )
635    config_dict = resolve_connector_config(
636        config=config,
637        config_file=config_file,
638        config_secret_name=config_secret_name,
639        config_spec_jsonschema=source.config_spec,
640    )
641    source.set_config(config_dict)
642    cache = get_default_cache()
643
644    streams = resolve_list_of_strings(streams)
645    if streams and len(streams) == 1 and streams[0] in {"*", "suggested"}:
646        # Float '*' and 'suggested' to the top-level for special processing:
647        streams = streams[0]
648
649    if isinstance(streams, str) and streams == "suggested":
650        streams = "*"  # Default to all streams if 'suggested' is not otherwise specified.
651        try:
652            metadata = get_connector_metadata(
653                source_connector_name,
654            )
655        except Exception:
656            streams = "*"  # Fallback to all streams if suggested streams fail.
657        else:
658            if metadata is not None:
659                streams = metadata.suggested_streams or "*"
660
661    if isinstance(streams, str) and streams != "*":
662        streams = [streams]  # Ensure streams is a list
663
664    source.read(
665        cache=cache,
666        streams=streams,
667    )
668    del cache  # Ensure the cache is closed properly
669
670    summary: str = f"Sync completed for '{source_connector_name}'!\n\n"
671    summary += "Data written to default DuckDB cache\n"
672    return summary

Run a sync from a source connector to the default DuckDB cache.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

class CachedDatasetInfo(pydantic.main.BaseModel):
675class CachedDatasetInfo(BaseModel):
676    """Class to hold information about a cached dataset."""
677
678    stream_name: str
679    """The name of the stream in the cache."""
680    table_name: str
681    schema_name: str | None = None

Class to hold information about a cached dataset.

stream_name: str = PydanticUndefined

The name of the stream in the cache.

table_name: str = PydanticUndefined
schema_name: str | None = None
@mcp_tool(read_only=True, idempotent=True, extra_help_text=_CONFIG_HELP)
def list_cached_streams() -> list[CachedDatasetInfo]:
684@mcp_tool(
685    read_only=True,
686    idempotent=True,
687    extra_help_text=_CONFIG_HELP,
688)
689def list_cached_streams() -> list[CachedDatasetInfo]:
690    """List all streams available in the default DuckDB cache."""
691    cache: DuckDBCache = get_default_cache()
692    result = [
693        CachedDatasetInfo(
694            stream_name=stream_name,
695            table_name=(cache.table_prefix or "") + stream_name,
696            schema_name=cache.schema_name,
697        )
698        for stream_name in cache.streams
699    ]
700    del cache  # Ensure the cache is closed properly
701    return result

List all streams available in the default DuckDB cache.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

@mcp_tool(read_only=True, idempotent=True, extra_help_text=_CONFIG_HELP)
def describe_default_cache() -> dict[str, typing.Any]:
704@mcp_tool(
705    read_only=True,
706    idempotent=True,
707    extra_help_text=_CONFIG_HELP,
708)
709def describe_default_cache() -> dict[str, Any]:
710    """Describe the currently configured default cache."""
711    cache = get_default_cache()
712    return {
713        "cache_type": type(cache).__name__,
714        "cache_dir": str(cache.cache_dir),
715        "cache_db_path": str(Path(cache.db_path).absolute()),
716        "cached_streams": list(cache.streams.keys()),
717    }

Describe the currently configured default cache.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

@mcp_tool(read_only=True, idempotent=True, extra_help_text=_CONFIG_HELP)
def run_sql_query( sql_query: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The SQL query to execute.')], max_records: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=1000, description='Maximum number of records to return.')]) -> list[dict[str, typing.Any]]:
755@mcp_tool(
756    read_only=True,
757    idempotent=True,
758    extra_help_text=_CONFIG_HELP,
759)
760def run_sql_query(
761    sql_query: Annotated[
762        str,
763        Field(description="The SQL query to execute."),
764    ],
765    max_records: Annotated[
766        int,
767        Field(
768            description="Maximum number of records to return.",
769            default=1000,
770        ),
771    ],
772) -> list[dict[str, Any]]:
773    """Run a SQL query against the default cache.
774
775    The dialect of SQL should match the dialect of the default cache.
776    Use `describe_default_cache` to see the cache type.
777
778    For DuckDB-type caches:
779    - Use `SHOW TABLES` to list all tables.
780    - Use `DESCRIBE <table_name>` to get the schema of a specific table
781
782    For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN.
783    """
784    # Check if the query is safe to execute
785    if not _is_safe_sql(sql_query):
786        return [
787            {
788                "ERROR": "Unsafe SQL query detected. Only read-only operations are allowed: "
789                "SELECT, DESCRIBE, SHOW, EXPLAIN",
790                "SQL_QUERY": sql_query,
791            }
792        ]
793
794    cache: DuckDBCache = get_default_cache()
795    try:
796        return cache.run_sql_query(
797            sql_query,
798            max_records=max_records,
799        )
800    except Exception as ex:
801        tb_str = traceback.format_exc()
802        return [
803            {
804                "ERROR": f"Error running SQL query: {ex!r}, {ex!s}",
805                "TRACEBACK": tb_str,
806                "SQL_QUERY": sql_query,
807            }
808        ]
809    finally:
810        del cache  # Ensure the cache is closed properly

Run a SQL query against the default cache.

The dialect of SQL should match the dialect of the default cache.
Use `describe_default_cache` to see the cache type.

For DuckDB-type caches:
- Use `SHOW TABLES` to list all tables.
- Use `DESCRIBE <table_name>` to get the schema of a specific table

For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

@mcp_tool(destructive=True)
def destination_smoke_test( destination_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The name of the destination connector to test (e.g. 'destination-snowflake', 'destination-motherduck').")], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The destination configuration as a dict object or JSON string. Must not contain hardcoded secrets; use secret_reference::ENV_VAR_NAME instead.')], config_file: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Path to a YAML or JSON file containing the destination configuration.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the destination configuration.')], scenarios: typing.Annotated[list[str] | str, FieldInfo(annotation=NoneType, required=False, default='fast', description="Which scenarios to run. Use 'fast' (default) for all fast predefined scenarios (excludes large_batch_stream), 'all' for every predefined scenario including large batch, or provide a list of scenario names or a comma-separated string.")], custom_scenarios: typing.Annotated[list[dict[str, typing.Any]] | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Additional custom test scenarios to inject. Each scenario should define 'name', 'json_schema', and optionally 'records' and 'primary_key'. These are unioned with the predefined scenarios.")], docker_image: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Optional Docker image override for the destination connector (e.g. 'airbyte/destination-snowflake:3.14.0').")], namespace_suffix: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Optional suffix appended to the auto-generated namespace. Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). Use this to distinguish concurrent runs.")], reuse_namespace: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Exact namespace to reuse from a previous run. When set, no new namespace is generated. Useful for running a second test against an already-populated namespace.')]) -> airbyte._util.destination_smoke_tests.DestinationSmokeTestResult:
813@mcp_tool(
814    destructive=True,
815)
816def destination_smoke_test(  # noqa: PLR0913, PLR0917
817    destination_connector_name: Annotated[
818        str,
819        Field(
820            description=(
821                "The name of the destination connector to test "
822                "(e.g. 'destination-snowflake', 'destination-motherduck')."
823            ),
824        ),
825    ],
826    config: Annotated[
827        dict | str | None,
828        Field(
829            description=(
830                "The destination configuration as a dict object or JSON string. "
831                "Must not contain hardcoded secrets; use secret_reference::ENV_VAR_NAME instead."
832            ),
833            default=None,
834        ),
835    ],
836    config_file: Annotated[
837        str | Path | None,
838        Field(
839            description="Path to a YAML or JSON file containing the destination configuration.",
840            default=None,
841        ),
842    ],
843    config_secret_name: Annotated[
844        str | None,
845        Field(
846            description="The name of the secret containing the destination configuration.",
847            default=None,
848        ),
849    ],
850    scenarios: Annotated[
851        list[str] | str,
852        Field(
853            description=(
854                "Which scenarios to run. Use 'fast' (default) for all fast predefined "
855                "scenarios (excludes large_batch_stream), 'all' for every predefined "
856                "scenario including large batch, or provide a list of scenario names "
857                "or a comma-separated string."
858            ),
859            default="fast",
860        ),
861    ],
862    custom_scenarios: Annotated[
863        list[dict[str, Any]] | None,
864        Field(
865            description=(
866                "Additional custom test scenarios to inject. Each scenario should define "
867                "'name', 'json_schema', and optionally 'records' and 'primary_key'. "
868                "These are unioned with the predefined scenarios."
869            ),
870            default=None,
871        ),
872    ],
873    docker_image: Annotated[
874        str | None,
875        Field(
876            description=(
877                "Optional Docker image override for the destination connector "
878                "(e.g. 'airbyte/destination-snowflake:3.14.0')."
879            ),
880            default=None,
881        ),
882    ],
883    namespace_suffix: Annotated[
884        str | None,
885        Field(
886            description=(
887                "Optional suffix appended to the auto-generated namespace. "
888                "Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). "
889                "Use this to distinguish concurrent runs."
890            ),
891            default=None,
892        ),
893    ],
894    reuse_namespace: Annotated[
895        str | None,
896        Field(
897            description=(
898                "Exact namespace to reuse from a previous run. "
899                "When set, no new namespace is generated. "
900                "Useful for running a second test against an already-populated namespace."
901            ),
902            default=None,
903        ),
904    ],
905) -> DestinationSmokeTestResult:
906    """Run smoke tests against a destination connector.
907
908    Sends synthetic test data from the smoke test source to the specified
909    destination and reports success or failure. The smoke test source generates
910    data across predefined scenarios covering common destination failure patterns:
911    type variations, null handling, naming edge cases, schema variations, and
912    batch sizes.
913
914    This tool does NOT read back data from the destination or compare results.
915    It only verifies that the destination accepts the data without errors.
916    """
917    # Resolve destination config
918    config_dict = resolve_connector_config(
919        config=config,
920        config_file=config_file,
921        config_secret_name=config_secret_name,
922    )
923
924    # Set up destination
925    destination_kwargs: dict[str, Any] = {
926        "name": destination_connector_name,
927        "config": config_dict,
928    }
929    if docker_image:
930        destination_kwargs["docker_image"] = docker_image
931    elif is_docker_installed():
932        destination_kwargs["docker_image"] = True
933
934    destination_obj = get_destination(**destination_kwargs)
935
936    # Resolve scenarios for the shared helper
937    resolved_scenarios: str | list[str]
938    if isinstance(scenarios, str):
939        resolved_scenarios = scenarios
940    else:
941        resolved_scenarios = resolve_list_of_strings(scenarios) or "fast"
942
943    return run_destination_smoke_test(
944        destination=destination_obj,
945        scenarios=resolved_scenarios,
946        namespace_suffix=namespace_suffix,
947        reuse_namespace=reuse_namespace,
948        custom_scenarios=custom_scenarios,
949    )

Run smoke tests against a destination connector.

Sends synthetic test data from the smoke test source to the specified destination and reports success or failure. The smoke test source generates data across predefined scenarios covering common destination failure patterns: type variations, null handling, naming edge cases, schema variations, and batch sizes.

This tool does NOT read back data from the destination or compare results. It only verifies that the destination accepts the data without errors.

def register_local_tools(app: fastmcp.server.server.FastMCP) -> None:
952def register_local_tools(app: FastMCP) -> None:
953    """Register local tools with the FastMCP app.
954
955    Args:
956        app: FastMCP application instance
957    """
958    register_mcp_tools(app, mcp_module=__name__)

Register local tools with the FastMCP app.

Arguments:
  • app: FastMCP application instance