airbyte.mcp.local

Local MCP operations.

local module

MCP primitives registered by the local module of the airbyte-mcp server: 12 tool(s), 0 prompt(s), 0 resource(s).

Tools (12)

describe_default_cache

Hints: read-only · idempotent

Describe the currently configured default cache.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

Parameters

_No parameters._

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {},
  "type": "object"
}

Show output JSON schema

{
  "additionalProperties": true,
  "type": "object"
}

destination_smoke_test

Hints: destructive

Run smoke tests against a destination connector.

Sends synthetic test data from the smoke test source to the specified destination and reports success or failure. The smoke test source generates data across predefined scenarios covering common destination failure patterns: type variations, null handling, naming edge cases, schema variations, and batch sizes.

When the destination has a compatible cache implementation (DuckDB, Postgres, Snowflake, BigQuery, MotherDuck), readback introspection is automatically performed after a successful write. The readback produces stats on the written data: table row counts, column names/types, and per-column null/non-null counts. Results are included in the response as table_statistics and tables_not_found.

Parameters

Name Type Required Default Description
destination_connector_name string yes The name of the destination connector to test (e.g. 'destination-snowflake', 'destination-motherduck').
config object | string | null no null The destination configuration as a dict object or JSON string. Must not contain hardcoded secrets; use secret_reference::ENV_VAR_NAME instead.
config_file string | string | null no null Path to a YAML or JSON file containing the destination configuration.
config_secret_name string | null no null The name of the secret containing the destination configuration.
scenarios array<string> | string no "fast" Which scenarios to run. Use 'fast' (default) for all fast predefined scenarios (excludes large_batch_stream), 'all' for every predefined scenario including large batch, or provide a list of scenario names or a comma-separated string.
custom_scenarios array<object> | null no null Additional custom test scenarios to inject. Each scenario should define 'name', 'json_schema', and optionally 'records' and 'primary_key'. These are unioned with the predefined scenarios.
docker_image string | null no null Optional Docker image override for the destination connector (e.g. 'airbyte/destination-snowflake:3.14.0').
namespace_suffix string | null no null Optional suffix appended to the auto-generated namespace. Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). Use this to distinguish concurrent runs.
reuse_namespace string | null no null Exact namespace to reuse from a previous run. When set, no new namespace is generated. Useful for running a second test against an already-populated namespace.
skip_preflight boolean no false Skip the automatic preflight check that runs basic_types before the requested scenarios. Set to true when you expect basic_types itself to fail or want to save time on repeated runs.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "destination_connector_name": {
      "description": "The name of the destination connector to test (e.g. 'destination-snowflake', 'destination-motherduck').",
      "type": "string"
    },
    "config": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The destination configuration as a dict object or JSON string. Must not contain hardcoded secrets; use secret_reference::ENV_VAR_NAME instead."
    },
    "config_file": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a YAML or JSON file containing the destination configuration."
    },
    "config_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The name of the secret containing the destination configuration."
    },
    "scenarios": {
      "anyOf": [
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        {
          "type": "string"
        }
      ],
      "default": "fast",
      "description": "Which scenarios to run. Use 'fast' (default) for all fast predefined scenarios (excludes large_batch_stream), 'all' for every predefined scenario including large batch, or provide a list of scenario names or a comma-separated string."
    },
    "custom_scenarios": {
      "anyOf": [
        {
          "items": {
            "additionalProperties": true,
            "type": "object"
          },
          "type": "array"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Additional custom test scenarios to inject. Each scenario should define 'name', 'json_schema', and optionally 'records' and 'primary_key'. These are unioned with the predefined scenarios."
    },
    "docker_image": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional Docker image override for the destination connector (e.g. 'airbyte/destination-snowflake:3.14.0')."
    },
    "namespace_suffix": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional suffix appended to the auto-generated namespace. Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). Use this to distinguish concurrent runs."
    },
    "reuse_namespace": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Exact namespace to reuse from a previous run. When set, no new namespace is generated. Useful for running a second test against an already-populated namespace."
    },
    "skip_preflight": {
      "default": false,
      "description": "Skip the automatic preflight check that runs basic_types before the requested scenarios. Set to true when you expect basic_types itself to fail or want to save time on repeated runs.",
      "type": "boolean"
    }
  },
  "required": [
    "destination_connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of a destination smoke test run.",
  "properties": {
    "success": {
      "type": "boolean"
    },
    "destination": {
      "type": "string"
    },
    "namespace": {
      "type": "string"
    },
    "records_delivered": {
      "type": "integer"
    },
    "scenarios_requested": {
      "type": "string"
    },
    "elapsed_seconds": {
      "type": "number"
    },
    "error": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "preflight_passed": {
      "anyOf": [
        {
          "type": "boolean"
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "table_statistics": {
      "anyOf": [
        {
          "additionalProperties": {
            "description": "Statistics for a single table: row count, column info, and per-column stats.",
            "properties": {
              "table_name": {
                "type": "string"
              },
              "database_name": {
                "anyOf": [
                  {
                    "type": "string"
                  },
                  {
                    "type": "null"
                  }
                ],
                "default": null
              },
              "schema_name": {
                "anyOf": [
                  {
                    "type": "string"
                  },
                  {
                    "type": "null"
                  }
                ],
                "default": null
              },
              "row_count": {
                "anyOf": [
                  {
                    "type": "integer"
                  },
                  {
                    "type": "null"
                  }
                ],
                "default": null
              },
              "column_statistics": {
                "items": {
                  "description": "Null/non-null statistics for a single column.",
                  "properties": {
                    "column_name": {
                      "type": "string"
                    },
                    "column_type": {
                      "type": "string"
                    },
                    "null_count": {
                      "anyOf": [
                        {
                          "type": "integer"
                        },
                        {
                          "type": "null"
                        }
                      ],
                      "default": null
                    },
                    "non_null_count": {
                      "anyOf": [
                        {
                          "type": "integer"
                        },
                        {
                          "type": "null"
                        }
                      ],
                      "default": null
                    },
                    "total_count": {
                      "anyOf": [
                        {
                          "type": "integer"
                        },
                        {
                          "type": "null"
                        }
                      ],
                      "default": null
                    }
                  },
                  "required": [
                    "column_name",
                    "column_type"
                  ],
                  "type": "object"
                },
                "type": "array"
              }
            },
            "required": [
              "table_name",
              "column_statistics"
            ],
            "type": "object"
          },
          "type": "object"
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "tables_not_found": {
      "anyOf": [
        {
          "additionalProperties": {
            "type": "string"
          },
          "type": "object"
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "warnings": {
      "anyOf": [
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        {
          "type": "null"
        }
      ],
      "default": null
    }
  },
  "required": [
    "success",
    "destination",
    "namespace",
    "records_delivered",
    "scenarios_requested",
    "elapsed_seconds"
  ],
  "type": "object"
}

get_source_stream_json_schema

Hints: read-only · idempotent

List all properties for a specific stream in a source connector.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

Parameters

Name Type Required Default Description
source_connector_name string yes The name of the source connector.
stream_name string yes The name of the stream.
config object | string | null no null The configuration for the source connector as a dict or JSON string.
config_file string | string | null no null Path to a YAML or JSON file containing the source connector config.
config_secret_name string | null no null The name of the secret containing the configuration.
override_execution_mode enum("docker", "python", "yaml", "auto") no "auto" Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).
manifest_path string | string | null no null Path to a local YAML manifest file for declarative connectors.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_connector_name": {
      "description": "The name of the source connector.",
      "type": "string"
    },
    "stream_name": {
      "description": "The name of the stream.",
      "type": "string"
    },
    "config": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The configuration for the source connector as a dict or JSON string."
    },
    "config_file": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a YAML or JSON file containing the source connector config."
    },
    "config_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The name of the secret containing the configuration."
    },
    "override_execution_mode": {
      "default": "auto",
      "description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
      "enum": [
        "docker",
        "python",
        "yaml",
        "auto"
      ],
      "type": "string"
    },
    "manifest_path": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a local YAML manifest file for declarative connectors."
    }
  },
  "required": [
    "source_connector_name",
    "stream_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "additionalProperties": true,
  "type": "object"
}

get_stream_previews

Hints: read-only

Get sample records (previews) from streams in a source connector.

This operation requires a valid configuration, including any required secrets.
Returns a dictionary mapping stream names to lists of sample records, or an error
message string if an error occurred for that stream.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

Parameters

Name Type Required Default Description
source_name string yes The name of the source connector.
config object | string | null no null The configuration for the source connector as a dict or JSON string.
config_file string | string | null no null Path to a YAML or JSON file containing the source connector config.
config_secret_name string | null no null The name of the secret containing the configuration.
streams array<string> | string | null no null The streams to get previews for. Use '*' for all streams, or None for selected streams.
limit integer no 10 The maximum number of sample records to return per stream.
override_execution_mode enum("docker", "python", "yaml", "auto") no "auto" Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).
manifest_path string | string | null no null Path to a local YAML manifest file for declarative connectors.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_name": {
      "description": "The name of the source connector.",
      "type": "string"
    },
    "config": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The configuration for the source connector as a dict or JSON string."
    },
    "config_file": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a YAML or JSON file containing the source connector config."
    },
    "config_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The name of the secret containing the configuration."
    },
    "streams": {
      "anyOf": [
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The streams to get previews for. Use '*' for all streams, or None for selected streams."
    },
    "limit": {
      "default": 10,
      "description": "The maximum number of sample records to return per stream.",
      "type": "integer"
    },
    "override_execution_mode": {
      "default": "auto",
      "description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
      "enum": [
        "docker",
        "python",
        "yaml",
        "auto"
      ],
      "type": "string"
    },
    "manifest_path": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a local YAML manifest file for declarative connectors."
    }
  },
  "required": [
    "source_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "additionalProperties": {
    "anyOf": [
      {
        "items": {
          "additionalProperties": true,
          "type": "object"
        },
        "type": "array"
      },
      {
        "type": "string"
      }
    ]
  },
  "type": "object"
}

list_cached_streams

Hints: read-only · idempotent

List all streams available in the default DuckDB cache.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

Parameters

_No parameters._

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {},
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "description": "Class to hold information about a cached dataset.",
        "properties": {
          "stream_name": {
            "type": "string"
          },
          "table_name": {
            "type": "string"
          },
          "schema_name": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null
          }
        },
        "required": [
          "stream_name",
          "table_name"
        ],
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

list_connector_config_secrets

Hints: read-only · idempotent

List all config_secret_name options that are known for the given connector.

This can be used to find out which already-created config secret names are available for a given connector. The return value is a list of secret names, but it will not return the actual secret values.

Parameters

Name Type Required Default Description
connector_name string yes The name of the connector.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_name": {
      "description": "The name of the connector.",
      "type": "string"
    }
  },
  "required": [
    "connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "type": "string"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

list_dotenv_secrets

Hints: read-only · idempotent

List all environment variable names declared within declared .env files.

This returns a dictionary mapping the .env file name to a list of environment
variable names. The values of the environment variables are not returned.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

Parameters

_No parameters._

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {},
  "type": "object"
}

Show output JSON schema

{
  "additionalProperties": {
    "items": {
      "type": "string"
    },
    "type": "array"
  },
  "type": "object"
}

list_source_streams

Hints: read-only · idempotent

List all streams available in a source connector.

This operation (generally) requires a valid configuration, including any required secrets.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

Parameters

Name Type Required Default Description
source_connector_name string yes The name of the source connector.
config object | string | null no null The configuration for the source connector as a dict or JSON string.
config_file string | string | null no null Path to a YAML or JSON file containing the source connector config.
config_secret_name string | null no null The name of the secret containing the configuration.
override_execution_mode enum("docker", "python", "yaml", "auto") no "auto" Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).
manifest_path string | string | null no null Path to a local YAML manifest file for declarative connectors.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_connector_name": {
      "description": "The name of the source connector.",
      "type": "string"
    },
    "config": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The configuration for the source connector as a dict or JSON string."
    },
    "config_file": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a YAML or JSON file containing the source connector config."
    },
    "config_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The name of the secret containing the configuration."
    },
    "override_execution_mode": {
      "default": "auto",
      "description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
      "enum": [
        "docker",
        "python",
        "yaml",
        "auto"
      ],
      "type": "string"
    },
    "manifest_path": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a local YAML manifest file for declarative connectors."
    }
  },
  "required": [
    "source_connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "type": "string"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

read_source_stream_records

Hints: read-only

Get records from a source connector.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

Parameters

Name Type Required Default Description
source_connector_name string yes The name of the source connector.
config object | string | null no null The configuration for the source connector as a dict or JSON string.
config_file string | string | null no null Path to a YAML or JSON file containing the source connector config.
config_secret_name string | null no null The name of the secret containing the configuration.
stream_name string yes The name of the stream to read records from.
max_records integer no 1000 The maximum number of records to read.
override_execution_mode enum("docker", "python", "yaml", "auto") no "auto" Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).
manifest_path string | string | null no null Path to a local YAML manifest file for declarative connectors.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_connector_name": {
      "description": "The name of the source connector.",
      "type": "string"
    },
    "config": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The configuration for the source connector as a dict or JSON string."
    },
    "config_file": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a YAML or JSON file containing the source connector config."
    },
    "config_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The name of the secret containing the configuration."
    },
    "stream_name": {
      "description": "The name of the stream to read records from.",
      "type": "string"
    },
    "max_records": {
      "default": 1000,
      "description": "The maximum number of records to read.",
      "type": "integer"
    },
    "override_execution_mode": {
      "default": "auto",
      "description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
      "enum": [
        "docker",
        "python",
        "yaml",
        "auto"
      ],
      "type": "string"
    },
    "manifest_path": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a local YAML manifest file for declarative connectors."
    }
  },
  "required": [
    "source_connector_name",
    "stream_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "anyOf": [
        {
          "items": {
            "additionalProperties": true,
            "type": "object"
          },
          "type": "array"
        },
        {
          "type": "string"
        }
      ]
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

run_sql_query

Hints: read-only · idempotent

Run a SQL query against the default cache.

The dialect of SQL should match the dialect of the default cache.
Use `describe_default_cache` to see the cache type.

For DuckDB-type caches:
- Use `SHOW TABLES` to list all tables.
- Use `DESCRIBE <table_name>` to get the schema of a specific table

For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

Parameters

Name Type Required Default Description
sql_query string yes The SQL query to execute.
max_records integer no 1000 Maximum number of records to return.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "sql_query": {
      "description": "The SQL query to execute.",
      "type": "string"
    },
    "max_records": {
      "default": 1000,
      "description": "Maximum number of records to return.",
      "type": "integer"
    }
  },
  "required": [
    "sql_query"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

sync_source_to_cache

Run a sync from a source connector to the default DuckDB cache.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

Parameters

Name Type Required Default Description
source_connector_name string yes The name of the source connector.
config object | string | null no null The configuration for the source connector as a dict or JSON string.
config_file string | string | null no null Path to a YAML or JSON file containing the source connector config.
config_secret_name string | null no null The name of the secret containing the configuration.
streams array<string> | string no "suggested" The streams to sync.
override_execution_mode enum("docker", "python", "yaml", "auto") no "auto" Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).
manifest_path string | string | null no null Path to a local YAML manifest file for declarative connectors.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_connector_name": {
      "description": "The name of the source connector.",
      "type": "string"
    },
    "config": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The configuration for the source connector as a dict or JSON string."
    },
    "config_file": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a YAML or JSON file containing the source connector config."
    },
    "config_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The name of the secret containing the configuration."
    },
    "streams": {
      "anyOf": [
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        {
          "type": "string"
        }
      ],
      "default": "suggested",
      "description": "The streams to sync."
    },
    "override_execution_mode": {
      "default": "auto",
      "description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
      "enum": [
        "docker",
        "python",
        "yaml",
        "auto"
      ],
      "type": "string"
    },
    "manifest_path": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a local YAML manifest file for declarative connectors."
    }
  },
  "required": [
    "source_connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

validate_connector_config

Hints: read-only · idempotent

Validate a connector configuration.

Returns a tuple of (is_valid: bool, message: str).

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

Parameters

Name Type Required Default Description
connector_name string yes The name of the connector to validate.
config object | string | null no null The configuration for the connector as a dict object or JSON string.
config_file string | string | null no null Path to a YAML or JSON file containing the connector configuration.
config_secret_name string | null no null The name of the secret containing the configuration.
override_execution_mode enum("docker", "python", "yaml", "auto") no "auto" Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).
manifest_path string | string | null no null Path to a local YAML manifest file for declarative connectors.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_name": {
      "description": "The name of the connector to validate.",
      "type": "string"
    },
    "config": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The configuration for the connector as a dict object or JSON string."
    },
    "config_file": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a YAML or JSON file containing the connector configuration."
    },
    "config_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The name of the secret containing the configuration."
    },
    "override_execution_mode": {
      "default": "auto",
      "description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
      "enum": [
        "docker",
        "python",
        "yaml",
        "auto"
      ],
      "type": "string"
    },
    "manifest_path": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a local YAML manifest file for declarative connectors."
    }
  },
  "required": [
    "connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "maxItems": 2,
      "minItems": 2,
      "prefixItems": [
        {
          "type": "boolean"
        },
        {
          "type": "string"
        }
      ],
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Local MCP operations.
  3
  4.. include:: ../../docs/mcp-generated/local.md
  5"""
  6
  7# No public Python API — MCP primitives are registered via decorators and
  8# documented via the generated Markdown include above. Setting `__all__` to an
  9# empty list tells pdoc (and other doc tools) not to surface the individual
 10# tool / helper definitions as a redundant "API Documentation" list.
 11__all__: list[str] = []
 12
 13import sys
 14import traceback
 15from itertools import islice
 16from pathlib import Path
 17from typing import TYPE_CHECKING, Annotated, Any, Literal
 18
 19from fastmcp import FastMCP
 20from fastmcp_extensions import mcp_tool, register_mcp_tools
 21from pydantic import BaseModel, Field
 22
 23from airbyte import get_source
 24from airbyte._util.destination_smoke_tests import (
 25    DestinationSmokeTestResult,
 26    run_destination_smoke_test,
 27)
 28from airbyte._util.meta import is_docker_installed
 29from airbyte.caches.util import get_default_cache
 30from airbyte.destinations.util import get_destination
 31from airbyte.mcp._arg_resolvers import resolve_connector_config, resolve_list_of_strings
 32from airbyte.registry import get_connector_metadata
 33from airbyte.secrets.config import _get_secret_sources
 34from airbyte.secrets.env_vars import DotenvSecretManager
 35from airbyte.secrets.google_gsm import GoogleGSMSecretManager
 36from airbyte.sources.base import Source
 37
 38
 39if TYPE_CHECKING:
 40    from airbyte.caches.duckdb import DuckDBCache
 41
 42
 43_CONFIG_HELP = """
 44You can provide `config` as JSON or a Path to a YAML/JSON file.
 45If a `dict` is provided, it must not contain hardcoded secrets.
 46Instead, secrets should be provided using environment variables,
 47and the config should reference them using the format
 48`secret_reference::ENV_VAR_NAME`.
 49
 50You can also provide a `config_secret_name` to use a specific
 51secret name for the configuration. This is useful if you want to
 52validate a configuration that is stored in a secrets manager.
 53
 54If `config_secret_name` is provided, it should point to a string
 55that contains valid JSON or YAML.
 56
 57If both `config` and `config_secret_name` are provided, the
 58`config` will be loaded first and then the referenced secret config
 59will be layered on top of the non-secret config.
 60
 61For declarative connectors, you can provide a `manifest_path` to
 62specify a local YAML manifest file instead of using the registry
 63version. This is useful for testing custom or locally-developed
 64connector manifests.
 65"""
 66
 67
 68def _get_mcp_source(
 69    connector_name: str,
 70    override_execution_mode: Literal["auto", "docker", "python", "yaml"] = "auto",
 71    *,
 72    install_if_missing: bool = True,
 73    manifest_path: str | Path | None,
 74) -> Source:
 75    """Get the MCP source for a connector."""
 76    if manifest_path:
 77        override_execution_mode = "yaml"
 78    elif override_execution_mode == "auto" and is_docker_installed():
 79        override_execution_mode = "docker"
 80
 81    source: Source
 82    if override_execution_mode == "auto":
 83        # Use defaults with no overrides
 84        source = get_source(
 85            connector_name,
 86            install_if_missing=False,
 87            source_manifest=manifest_path or None,
 88        )
 89    elif override_execution_mode == "python":
 90        source = get_source(
 91            connector_name,
 92            use_python=True,
 93            install_if_missing=False,
 94            source_manifest=manifest_path or None,
 95        )
 96    elif override_execution_mode == "docker":
 97        source = get_source(
 98            connector_name,
 99            docker_image=True,
100            install_if_missing=False,
101            source_manifest=manifest_path or None,
102        )
103    elif override_execution_mode == "yaml":
104        source = get_source(
105            connector_name,
106            source_manifest=manifest_path or True,
107            install_if_missing=False,
108        )
109    else:
110        raise ValueError(
111            f"Unknown execution method: {override_execution_mode}. "
112            "Expected one of: ['auto', 'docker', 'python', 'yaml']."
113        )
114
115    # Ensure installed:
116    if install_if_missing:
117        source.executor.ensure_installation()
118
119    return source
120
121
122@mcp_tool(
123    read_only=True,
124    idempotent=True,
125    extra_help_text=_CONFIG_HELP,
126)
127def validate_connector_config(
128    connector_name: Annotated[
129        str,
130        Field(description="The name of the connector to validate."),
131    ],
132    config: Annotated[
133        dict | str | None,
134        Field(
135            description="The configuration for the connector as a dict object or JSON string.",
136            default=None,
137        ),
138    ],
139    config_file: Annotated[
140        str | Path | None,
141        Field(
142            description="Path to a YAML or JSON file containing the connector configuration.",
143            default=None,
144        ),
145    ],
146    config_secret_name: Annotated[
147        str | None,
148        Field(
149            description="The name of the secret containing the configuration.",
150            default=None,
151        ),
152    ],
153    override_execution_mode: Annotated[
154        Literal["docker", "python", "yaml", "auto"],
155        Field(
156            description="Optionally override the execution method to use for the connector. "
157            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
158            default="auto",
159        ),
160    ],
161    manifest_path: Annotated[
162        str | Path | None,
163        Field(
164            description="Path to a local YAML manifest file for declarative connectors.",
165            default=None,
166        ),
167    ],
168) -> tuple[bool, str]:
169    """Validate a connector configuration.
170
171    Returns a tuple of (is_valid: bool, message: str).
172    """
173    try:
174        source: Source = _get_mcp_source(
175            connector_name,
176            override_execution_mode=override_execution_mode,
177            manifest_path=manifest_path,
178        )
179    except Exception as ex:
180        return False, f"Failed to get connector '{connector_name}': {ex}"
181
182    try:
183        config_dict = resolve_connector_config(
184            config=config,
185            config_file=config_file,
186            config_secret_name=config_secret_name,
187            config_spec_jsonschema=source.config_spec,
188        )
189        source.set_config(config_dict)
190    except Exception as ex:
191        return False, f"Failed to resolve configuration for {connector_name}: {ex}"
192
193    try:
194        source.check()
195    except Exception as ex:
196        return False, f"Configuration for {connector_name} is invalid: {ex}"
197
198    return True, f"Configuration for {connector_name} is valid!"
199
200
201@mcp_tool(
202    read_only=True,
203    idempotent=True,
204)
205def list_connector_config_secrets(
206    connector_name: Annotated[
207        str,
208        Field(description="The name of the connector."),
209    ],
210) -> list[str]:
211    """List all `config_secret_name` options that are known for the given connector.
212
213    This can be used to find out which already-created config secret names are available
214    for a given connector. The return value is a list of secret names, but it will not
215    return the actual secret values.
216    """
217    secrets_names: list[str] = []
218    for secrets_mgr in _get_secret_sources():
219        if isinstance(secrets_mgr, GoogleGSMSecretManager):
220            secrets_names.extend(
221                [
222                    secret_handle.secret_name.split("/")[-1]
223                    for secret_handle in secrets_mgr.fetch_connector_secrets(connector_name)
224                ]
225            )
226
227    return secrets_names
228
229
230@mcp_tool(
231    read_only=True,
232    idempotent=True,
233    extra_help_text=_CONFIG_HELP,
234)
235def list_dotenv_secrets() -> dict[str, list[str]]:
236    """List all environment variable names declared within declared .env files.
237
238    This returns a dictionary mapping the .env file name to a list of environment
239    variable names. The values of the environment variables are not returned.
240    """
241    result: dict[str, list[str]] = {}
242    for secrets_mgr in _get_secret_sources():
243        if isinstance(secrets_mgr, DotenvSecretManager) and secrets_mgr.dotenv_path:
244            result[str(secrets_mgr.dotenv_path.resolve())] = secrets_mgr.list_secrets_names()
245
246    return result
247
248
249@mcp_tool(
250    read_only=True,
251    idempotent=True,
252    extra_help_text=_CONFIG_HELP,
253)
254def list_source_streams(
255    source_connector_name: Annotated[
256        str,
257        Field(description="The name of the source connector."),
258    ],
259    config: Annotated[
260        dict | str | None,
261        Field(
262            description="The configuration for the source connector as a dict or JSON string.",
263            default=None,
264        ),
265    ],
266    config_file: Annotated[
267        str | Path | None,
268        Field(
269            description="Path to a YAML or JSON file containing the source connector config.",
270            default=None,
271        ),
272    ],
273    config_secret_name: Annotated[
274        str | None,
275        Field(
276            description="The name of the secret containing the configuration.",
277            default=None,
278        ),
279    ],
280    override_execution_mode: Annotated[
281        Literal["docker", "python", "yaml", "auto"],
282        Field(
283            description="Optionally override the execution method to use for the connector. "
284            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
285            default="auto",
286        ),
287    ],
288    manifest_path: Annotated[
289        str | Path | None,
290        Field(
291            description="Path to a local YAML manifest file for declarative connectors.",
292            default=None,
293        ),
294    ],
295) -> list[str]:
296    """List all streams available in a source connector.
297
298    This operation (generally) requires a valid configuration, including any required secrets.
299    """
300    source: Source = _get_mcp_source(
301        connector_name=source_connector_name,
302        override_execution_mode=override_execution_mode,
303        manifest_path=manifest_path,
304    )
305    config_dict = resolve_connector_config(
306        config=config,
307        config_file=config_file,
308        config_secret_name=config_secret_name,
309        config_spec_jsonschema=source.config_spec,
310    )
311    source.set_config(config_dict)
312    return source.get_available_streams()
313
314
315@mcp_tool(
316    read_only=True,
317    idempotent=True,
318    extra_help_text=_CONFIG_HELP,
319)
320def get_source_stream_json_schema(
321    source_connector_name: Annotated[
322        str,
323        Field(description="The name of the source connector."),
324    ],
325    stream_name: Annotated[
326        str,
327        Field(description="The name of the stream."),
328    ],
329    config: Annotated[
330        dict | str | None,
331        Field(
332            description="The configuration for the source connector as a dict or JSON string.",
333            default=None,
334        ),
335    ],
336    config_file: Annotated[
337        str | Path | None,
338        Field(
339            description="Path to a YAML or JSON file containing the source connector config.",
340            default=None,
341        ),
342    ],
343    config_secret_name: Annotated[
344        str | None,
345        Field(
346            description="The name of the secret containing the configuration.",
347            default=None,
348        ),
349    ],
350    override_execution_mode: Annotated[
351        Literal["docker", "python", "yaml", "auto"],
352        Field(
353            description="Optionally override the execution method to use for the connector. "
354            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
355            default="auto",
356        ),
357    ],
358    manifest_path: Annotated[
359        str | Path | None,
360        Field(
361            description="Path to a local YAML manifest file for declarative connectors.",
362            default=None,
363        ),
364    ],
365) -> dict[str, Any]:
366    """List all properties for a specific stream in a source connector."""
367    source: Source = _get_mcp_source(
368        connector_name=source_connector_name,
369        override_execution_mode=override_execution_mode,
370        manifest_path=manifest_path,
371    )
372    config_dict = resolve_connector_config(
373        config=config,
374        config_file=config_file,
375        config_secret_name=config_secret_name,
376        config_spec_jsonschema=source.config_spec,
377    )
378    source.set_config(config_dict)
379    return source.get_stream_json_schema(stream_name=stream_name)
380
381
382@mcp_tool(
383    read_only=True,
384    extra_help_text=_CONFIG_HELP,
385)
386def read_source_stream_records(
387    source_connector_name: Annotated[
388        str,
389        Field(description="The name of the source connector."),
390    ],
391    config: Annotated[
392        dict | str | None,
393        Field(
394            description="The configuration for the source connector as a dict or JSON string.",
395            default=None,
396        ),
397    ],
398    config_file: Annotated[
399        str | Path | None,
400        Field(
401            description="Path to a YAML or JSON file containing the source connector config.",
402            default=None,
403        ),
404    ],
405    config_secret_name: Annotated[
406        str | None,
407        Field(
408            description="The name of the secret containing the configuration.",
409            default=None,
410        ),
411    ],
412    *,
413    stream_name: Annotated[
414        str,
415        Field(description="The name of the stream to read records from."),
416    ],
417    max_records: Annotated[
418        int,
419        Field(
420            description="The maximum number of records to read.",
421            default=1000,
422        ),
423    ],
424    override_execution_mode: Annotated[
425        Literal["docker", "python", "yaml", "auto"],
426        Field(
427            description="Optionally override the execution method to use for the connector. "
428            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
429            default="auto",
430        ),
431    ],
432    manifest_path: Annotated[
433        str | Path | None,
434        Field(
435            description="Path to a local YAML manifest file for declarative connectors.",
436            default=None,
437        ),
438    ],
439) -> list[dict[str, Any]] | str:
440    """Get records from a source connector."""
441    try:
442        source: Source = _get_mcp_source(
443            connector_name=source_connector_name,
444            override_execution_mode=override_execution_mode,
445            manifest_path=manifest_path,
446        )
447        config_dict = resolve_connector_config(
448            config=config,
449            config_file=config_file,
450            config_secret_name=config_secret_name,
451            config_spec_jsonschema=source.config_spec,
452        )
453        source.set_config(config_dict)
454        # First we get a generator for the records in the specified stream.
455        record_generator = source.get_records(stream_name)
456        # Next we load a limited number of records from the generator into our list.
457        records: list[dict[str, Any]] = list(islice(record_generator, max_records))
458
459        print(f"Retrieved {len(records)} records from stream '{stream_name}'", sys.stderr)
460
461    except Exception as ex:
462        tb_str = traceback.format_exc()
463        # If any error occurs, we print the error message to stderr and return an empty list.
464        return (
465            f"Error reading records from source '{source_connector_name}': {ex!r}, {ex!s}\n{tb_str}"
466        )
467
468    else:
469        return records
470
471
472@mcp_tool(
473    read_only=True,
474    extra_help_text=_CONFIG_HELP,
475)
476def get_stream_previews(
477    source_name: Annotated[
478        str,
479        Field(description="The name of the source connector."),
480    ],
481    config: Annotated[
482        dict | str | None,
483        Field(
484            description="The configuration for the source connector as a dict or JSON string.",
485            default=None,
486        ),
487    ],
488    config_file: Annotated[
489        str | Path | None,
490        Field(
491            description="Path to a YAML or JSON file containing the source connector config.",
492            default=None,
493        ),
494    ],
495    config_secret_name: Annotated[
496        str | None,
497        Field(
498            description="The name of the secret containing the configuration.",
499            default=None,
500        ),
501    ],
502    streams: Annotated[
503        list[str] | str | None,
504        Field(
505            description=(
506                "The streams to get previews for. "
507                "Use '*' for all streams, or None for selected streams."
508            ),
509            default=None,
510        ),
511    ],
512    limit: Annotated[
513        int,
514        Field(
515            description="The maximum number of sample records to return per stream.",
516            default=10,
517        ),
518    ],
519    override_execution_mode: Annotated[
520        Literal["docker", "python", "yaml", "auto"],
521        Field(
522            description="Optionally override the execution method to use for the connector. "
523            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
524            default="auto",
525        ),
526    ],
527    manifest_path: Annotated[
528        str | Path | None,
529        Field(
530            description="Path to a local YAML manifest file for declarative connectors.",
531            default=None,
532        ),
533    ],
534) -> dict[str, list[dict[str, Any]] | str]:
535    """Get sample records (previews) from streams in a source connector.
536
537    This operation requires a valid configuration, including any required secrets.
538    Returns a dictionary mapping stream names to lists of sample records, or an error
539    message string if an error occurred for that stream.
540    """
541    source: Source = _get_mcp_source(
542        connector_name=source_name,
543        override_execution_mode=override_execution_mode,
544        manifest_path=manifest_path,
545    )
546
547    config_dict = resolve_connector_config(
548        config=config,
549        config_file=config_file,
550        config_secret_name=config_secret_name,
551        config_spec_jsonschema=source.config_spec,
552    )
553    source.set_config(config_dict)
554
555    streams_param: list[str] | Literal["*"] | None = resolve_list_of_strings(
556        streams
557    )  # pyrefly: ignore[no-matching-overload]
558    if streams_param and len(streams_param) == 1 and streams_param[0] == "*":
559        streams_param = "*"
560
561    try:
562        samples_result = source.get_samples(
563            streams=streams_param,
564            limit=limit,
565            on_error="ignore",
566        )
567    except Exception as ex:
568        tb_str = traceback.format_exc()
569        return {
570            "ERROR": f"Error getting stream previews from source '{source_name}': "
571            f"{ex!r}, {ex!s}\n{tb_str}"
572        }
573
574    result: dict[str, list[dict[str, Any]] | str] = {}
575    for stream_name, dataset in samples_result.items():
576        if dataset is None:
577            result[stream_name] = f"Could not retrieve stream samples for stream '{stream_name}'"
578        else:
579            result[stream_name] = list(dataset)
580
581    return result
582
583
584@mcp_tool(
585    destructive=False,
586    extra_help_text=_CONFIG_HELP,
587)
588def sync_source_to_cache(
589    source_connector_name: Annotated[
590        str,
591        Field(description="The name of the source connector."),
592    ],
593    config: Annotated[
594        dict | str | None,
595        Field(
596            description="The configuration for the source connector as a dict or JSON string.",
597            default=None,
598        ),
599    ],
600    config_file: Annotated[
601        str | Path | None,
602        Field(
603            description="Path to a YAML or JSON file containing the source connector config.",
604            default=None,
605        ),
606    ],
607    config_secret_name: Annotated[
608        str | None,
609        Field(
610            description="The name of the secret containing the configuration.",
611            default=None,
612        ),
613    ],
614    streams: Annotated[
615        list[str] | str,
616        Field(
617            description="The streams to sync.",
618            default="suggested",
619        ),
620    ],
621    override_execution_mode: Annotated[
622        Literal["docker", "python", "yaml", "auto"],
623        Field(
624            description="Optionally override the execution method to use for the connector. "
625            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
626            default="auto",
627        ),
628    ],
629    manifest_path: Annotated[
630        str | Path | None,
631        Field(
632            description="Path to a local YAML manifest file for declarative connectors.",
633            default=None,
634        ),
635    ],
636) -> str:
637    """Run a sync from a source connector to the default DuckDB cache."""
638    source: Source = _get_mcp_source(
639        connector_name=source_connector_name,
640        override_execution_mode=override_execution_mode,
641        manifest_path=manifest_path,
642    )
643    config_dict = resolve_connector_config(
644        config=config,
645        config_file=config_file,
646        config_secret_name=config_secret_name,
647        config_spec_jsonschema=source.config_spec,
648    )
649    source.set_config(config_dict)
650    cache = get_default_cache()
651
652    streams = resolve_list_of_strings(streams)
653    if streams and len(streams) == 1 and streams[0] in {"*", "suggested"}:
654        # Float '*' and 'suggested' to the top-level for special processing:
655        streams = streams[0]
656
657    if isinstance(streams, str) and streams == "suggested":
658        streams = "*"  # Default to all streams if 'suggested' is not otherwise specified.
659        try:
660            metadata = get_connector_metadata(
661                source_connector_name,
662            )
663        except Exception:
664            streams = "*"  # Fallback to all streams if suggested streams fail.
665        else:
666            if metadata is not None:
667                streams = metadata.suggested_streams or "*"
668
669    if isinstance(streams, str) and streams != "*":
670        streams = [streams]  # Ensure streams is a list
671
672    source.read(
673        cache=cache,
674        streams=streams,
675    )
676    del cache  # Ensure the cache is closed properly
677
678    summary: str = f"Sync completed for '{source_connector_name}'!\n\n"
679    summary += "Data written to default DuckDB cache\n"
680    return summary
681
682
683class CachedDatasetInfo(BaseModel):
684    """Class to hold information about a cached dataset."""
685
686    stream_name: str
687    """The name of the stream in the cache."""
688    table_name: str
689    schema_name: str | None = None
690
691
692@mcp_tool(
693    read_only=True,
694    idempotent=True,
695    extra_help_text=_CONFIG_HELP,
696)
697def list_cached_streams() -> list[CachedDatasetInfo]:
698    """List all streams available in the default DuckDB cache."""
699    cache: DuckDBCache = get_default_cache()
700    result = [
701        CachedDatasetInfo(
702            stream_name=stream_name,
703            table_name=(cache.table_prefix or "") + stream_name,
704            schema_name=cache.schema_name,
705        )
706        for stream_name in cache.streams
707    ]
708    del cache  # Ensure the cache is closed properly
709    return result
710
711
712@mcp_tool(
713    read_only=True,
714    idempotent=True,
715    extra_help_text=_CONFIG_HELP,
716)
717def describe_default_cache() -> dict[str, Any]:
718    """Describe the currently configured default cache."""
719    cache = get_default_cache()
720    return {
721        "cache_type": type(cache).__name__,
722        "cache_dir": str(cache.cache_dir),
723        "cache_db_path": str(Path(cache.db_path).absolute()),
724        "cached_streams": list(cache.streams.keys()),
725    }
726
727
728def _is_safe_sql(sql_query: str) -> bool:
729    """Check if a SQL query is safe to execute.
730
731    For security reasons, we only allow read-only operations like SELECT, DESCRIBE, and SHOW.
732    Multi-statement queries (containing semicolons) are also disallowed for security.
733
734    Note: SQLAlchemy will also validate downstream, but this is a first-pass check.
735
736    Args:
737        sql_query: The SQL query to check
738
739    Returns:
740        True if the query is safe to execute, False otherwise
741    """
742    # Remove leading/trailing whitespace and convert to uppercase for checking
743    normalized_query = sql_query.strip().upper()
744
745    # Disallow multi-statement queries (containing semicolons)
746    # Note: We check the original query to catch semicolons anywhere, including in comments
747    if ";" in sql_query:
748        return False
749
750    # List of allowed SQL statement prefixes (read-only operations)
751    allowed_prefixes = (
752        "SELECT",
753        "DESCRIBE",
754        "DESC",  # Short form of DESCRIBE
755        "SHOW",
756        "EXPLAIN",  # Also safe - shows query execution plan
757    )
758
759    # Check if the query starts with any allowed prefix
760    return any(normalized_query.startswith(prefix) for prefix in allowed_prefixes)
761
762
763@mcp_tool(
764    read_only=True,
765    idempotent=True,
766    extra_help_text=_CONFIG_HELP,
767)
768def run_sql_query(
769    sql_query: Annotated[
770        str,
771        Field(description="The SQL query to execute."),
772    ],
773    max_records: Annotated[
774        int,
775        Field(
776            description="Maximum number of records to return.",
777            default=1000,
778        ),
779    ],
780) -> list[dict[str, Any]]:
781    """Run a SQL query against the default cache.
782
783    The dialect of SQL should match the dialect of the default cache.
784    Use `describe_default_cache` to see the cache type.
785
786    For DuckDB-type caches:
787    - Use `SHOW TABLES` to list all tables.
788    - Use `DESCRIBE <table_name>` to get the schema of a specific table
789
790    For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN.
791    """
792    # Check if the query is safe to execute
793    if not _is_safe_sql(sql_query):
794        return [
795            {
796                "ERROR": "Unsafe SQL query detected. Only read-only operations are allowed: "
797                "SELECT, DESCRIBE, SHOW, EXPLAIN",
798                "SQL_QUERY": sql_query,
799            }
800        ]
801
802    cache: DuckDBCache = get_default_cache()
803    try:
804        return cache.run_sql_query(
805            sql_query,
806            max_records=max_records,
807        )
808    except Exception as ex:
809        tb_str = traceback.format_exc()
810        return [
811            {
812                "ERROR": f"Error running SQL query: {ex!r}, {ex!s}",
813                "TRACEBACK": tb_str,
814                "SQL_QUERY": sql_query,
815            }
816        ]
817    finally:
818        del cache  # Ensure the cache is closed properly
819
820
821@mcp_tool(
822    destructive=True,
823)
824def destination_smoke_test(  # noqa: PLR0913, PLR0917
825    destination_connector_name: Annotated[
826        str,
827        Field(
828            description=(
829                "The name of the destination connector to test "
830                "(e.g. 'destination-snowflake', 'destination-motherduck')."
831            ),
832        ),
833    ],
834    config: Annotated[
835        dict | str | None,
836        Field(
837            description=(
838                "The destination configuration as a dict object or JSON string. "
839                "Must not contain hardcoded secrets; use secret_reference::ENV_VAR_NAME instead."
840            ),
841            default=None,
842        ),
843    ],
844    config_file: Annotated[
845        str | Path | None,
846        Field(
847            description="Path to a YAML or JSON file containing the destination configuration.",
848            default=None,
849        ),
850    ],
851    config_secret_name: Annotated[
852        str | None,
853        Field(
854            description="The name of the secret containing the destination configuration.",
855            default=None,
856        ),
857    ],
858    scenarios: Annotated[
859        list[str] | str,
860        Field(
861            description=(
862                "Which scenarios to run. Use 'fast' (default) for all fast predefined "
863                "scenarios (excludes large_batch_stream), 'all' for every predefined "
864                "scenario including large batch, or provide a list of scenario names "
865                "or a comma-separated string."
866            ),
867            default="fast",
868        ),
869    ],
870    custom_scenarios: Annotated[
871        list[dict[str, Any]] | None,
872        Field(
873            description=(
874                "Additional custom test scenarios to inject. Each scenario should define "
875                "'name', 'json_schema', and optionally 'records' and 'primary_key'. "
876                "These are unioned with the predefined scenarios."
877            ),
878            default=None,
879        ),
880    ],
881    docker_image: Annotated[
882        str | None,
883        Field(
884            description=(
885                "Optional Docker image override for the destination connector "
886                "(e.g. 'airbyte/destination-snowflake:3.14.0')."
887            ),
888            default=None,
889        ),
890    ],
891    namespace_suffix: Annotated[
892        str | None,
893        Field(
894            description=(
895                "Optional suffix appended to the auto-generated namespace. "
896                "Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). "
897                "Use this to distinguish concurrent runs."
898            ),
899            default=None,
900        ),
901    ],
902    reuse_namespace: Annotated[
903        str | None,
904        Field(
905            description=(
906                "Exact namespace to reuse from a previous run. "
907                "When set, no new namespace is generated. "
908                "Useful for running a second test against an already-populated namespace."
909            ),
910            default=None,
911        ),
912    ],
913    skip_preflight: Annotated[
914        bool,
915        Field(
916            description=(
917                "Skip the automatic preflight check that runs basic_types before "
918                "the requested scenarios. Set to true when you expect basic_types "
919                "itself to fail or want to save time on repeated runs."
920            ),
921            default=False,
922        ),
923    ],
924) -> DestinationSmokeTestResult:
925    """Run smoke tests against a destination connector.
926
927    Sends synthetic test data from the smoke test source to the specified
928    destination and reports success or failure. The smoke test source generates
929    data across predefined scenarios covering common destination failure patterns:
930    type variations, null handling, naming edge cases, schema variations, and
931    batch sizes.
932
933    When the destination has a compatible cache implementation (DuckDB,
934    Postgres, Snowflake, BigQuery, MotherDuck), readback introspection is
935    automatically performed after a successful write. The readback produces
936    stats on the written data: table row counts, column names/types, and
937    per-column null/non-null counts. Results are included in the response
938    as `table_statistics` and `tables_not_found`.
939    """
940    # Resolve destination config
941    config_dict = resolve_connector_config(
942        config=config,
943        config_file=config_file,
944        config_secret_name=config_secret_name,
945    )
946
947    # Set up destination
948    destination_kwargs: dict[str, Any] = {
949        "name": destination_connector_name,
950        "config": config_dict,
951    }
952    if docker_image:
953        destination_kwargs["docker_image"] = docker_image
954    elif is_docker_installed():
955        destination_kwargs["docker_image"] = True
956
957    destination_obj = get_destination(**destination_kwargs)
958
959    # Resolve scenarios for the shared helper
960    resolved_scenarios: str | list[str]
961    if isinstance(scenarios, str):
962        resolved_scenarios = scenarios
963    else:
964        resolved_scenarios = resolve_list_of_strings(scenarios) or "fast"
965
966    return run_destination_smoke_test(
967        destination=destination_obj,
968        scenarios=resolved_scenarios,
969        namespace_suffix=namespace_suffix,
970        reuse_namespace=reuse_namespace,
971        custom_scenarios=custom_scenarios,
972        skip_preflight=skip_preflight,
973    )
974
975
976def register_local_tools(app: FastMCP) -> None:
977    """Register local tools with the FastMCP app.
978
979    Args:
980        app: FastMCP application instance
981    """
982    register_mcp_tools(app, mcp_module=__name__)