airbyte.mcp.local

Local MCP operations.

local module

MCP primitives registered by the local module of the airbyte-mcp server: 12 tool(s), 0 prompt(s), 0 resource(s).

Tools (12)

describe_default_cache

Hints: read-only · idempotent

Describe the currently configured default cache.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

Parameters

_No parameters._

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {},
  "type": "object"
}

Show output JSON schema

{
  "additionalProperties": true,
  "type": "object"
}

destination_smoke_test

Hints: destructive

Run smoke tests against a destination connector.

Sends synthetic test data from the smoke test source to the specified destination and reports success or failure. The smoke test source generates data across predefined scenarios covering common destination failure patterns: type variations, null handling, naming edge cases, schema variations, and batch sizes.

When the destination has a compatible cache implementation (DuckDB, Postgres, Snowflake, BigQuery, MotherDuck), readback introspection is automatically performed after a successful write. The readback produces stats on the written data: table row counts, column names/types, and per-column null/non-null counts. Results are included in the response as table_statistics and tables_not_found.

Parameters

Name Type Required Default Description
destination_connector_name string yes The name of the destination connector to test (e.g. 'destination-snowflake', 'destination-motherduck').
config object | string | null no null The destination configuration as a dict object or JSON string. Must not contain hardcoded secrets; use secret_reference::ENV_VAR_NAME instead.
config_file string | string | null no null Path to a YAML or JSON file containing the destination configuration.
config_secret_name string | null no null The name of the secret containing the destination configuration.
scenarios array<string> | string no "fast" Which scenarios to run. Use 'fast' (default) for all fast predefined scenarios (excludes large_batch_stream), 'all' for every predefined scenario including large batch, or provide a list of scenario names or a comma-separated string.
custom_scenarios array<object> | null no null Additional custom test scenarios to inject. Each scenario should define 'name', 'json_schema', and optionally 'records' and 'primary_key'. These are unioned with the predefined scenarios.
docker_image string | null no null Optional Docker image override for the destination connector (e.g. 'airbyte/destination-snowflake:3.14.0').
namespace_suffix string | null no null Optional suffix appended to the auto-generated namespace. Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). Use this to distinguish concurrent runs.
reuse_namespace string | null no null Exact namespace to reuse from a previous run. When set, no new namespace is generated. Useful for running a second test against an already-populated namespace.
skip_preflight boolean no false Skip the automatic preflight check that runs basic_types before the requested scenarios. Set to true when you expect basic_types itself to fail or want to save time on repeated runs.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "destination_connector_name": {
      "description": "The name of the destination connector to test (e.g. 'destination-snowflake', 'destination-motherduck').",
      "type": "string"
    },
    "config": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The destination configuration as a dict object or JSON string. Must not contain hardcoded secrets; use secret_reference::ENV_VAR_NAME instead."
    },
    "config_file": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a YAML or JSON file containing the destination configuration."
    },
    "config_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The name of the secret containing the destination configuration."
    },
    "scenarios": {
      "anyOf": [
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        {
          "type": "string"
        }
      ],
      "default": "fast",
      "description": "Which scenarios to run. Use 'fast' (default) for all fast predefined scenarios (excludes large_batch_stream), 'all' for every predefined scenario including large batch, or provide a list of scenario names or a comma-separated string."
    },
    "custom_scenarios": {
      "anyOf": [
        {
          "items": {
            "additionalProperties": true,
            "type": "object"
          },
          "type": "array"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Additional custom test scenarios to inject. Each scenario should define 'name', 'json_schema', and optionally 'records' and 'primary_key'. These are unioned with the predefined scenarios."
    },
    "docker_image": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional Docker image override for the destination connector (e.g. 'airbyte/destination-snowflake:3.14.0')."
    },
    "namespace_suffix": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional suffix appended to the auto-generated namespace. Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). Use this to distinguish concurrent runs."
    },
    "reuse_namespace": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Exact namespace to reuse from a previous run. When set, no new namespace is generated. Useful for running a second test against an already-populated namespace."
    },
    "skip_preflight": {
      "default": false,
      "description": "Skip the automatic preflight check that runs basic_types before the requested scenarios. Set to true when you expect basic_types itself to fail or want to save time on repeated runs.",
      "type": "boolean"
    }
  },
  "required": [
    "destination_connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of a destination smoke test run.",
  "properties": {
    "success": {
      "type": "boolean"
    },
    "destination": {
      "type": "string"
    },
    "namespace": {
      "type": "string"
    },
    "records_delivered": {
      "type": "integer"
    },
    "scenarios_requested": {
      "type": "string"
    },
    "elapsed_seconds": {
      "type": "number"
    },
    "error": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "preflight_passed": {
      "anyOf": [
        {
          "type": "boolean"
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "table_statistics": {
      "anyOf": [
        {
          "additionalProperties": {
            "description": "Statistics for a single table: row count, column info, and per-column stats.",
            "properties": {
              "table_name": {
                "type": "string"
              },
              "database_name": {
                "anyOf": [
                  {
                    "type": "string"
                  },
                  {
                    "type": "null"
                  }
                ],
                "default": null
              },
              "schema_name": {
                "anyOf": [
                  {
                    "type": "string"
                  },
                  {
                    "type": "null"
                  }
                ],
                "default": null
              },
              "row_count": {
                "anyOf": [
                  {
                    "type": "integer"
                  },
                  {
                    "type": "null"
                  }
                ],
                "default": null
              },
              "column_statistics": {
                "items": {
                  "description": "Null/non-null statistics for a single column.",
                  "properties": {
                    "column_name": {
                      "type": "string"
                    },
                    "column_type": {
                      "type": "string"
                    },
                    "null_count": {
                      "anyOf": [
                        {
                          "type": "integer"
                        },
                        {
                          "type": "null"
                        }
                      ],
                      "default": null
                    },
                    "non_null_count": {
                      "anyOf": [
                        {
                          "type": "integer"
                        },
                        {
                          "type": "null"
                        }
                      ],
                      "default": null
                    },
                    "total_count": {
                      "anyOf": [
                        {
                          "type": "integer"
                        },
                        {
                          "type": "null"
                        }
                      ],
                      "default": null
                    }
                  },
                  "required": [
                    "column_name",
                    "column_type"
                  ],
                  "type": "object"
                },
                "type": "array"
              }
            },
            "required": [
              "table_name",
              "column_statistics"
            ],
            "type": "object"
          },
          "type": "object"
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "tables_not_found": {
      "anyOf": [
        {
          "additionalProperties": {
            "type": "string"
          },
          "type": "object"
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "warnings": {
      "anyOf": [
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        {
          "type": "null"
        }
      ],
      "default": null
    }
  },
  "required": [
    "success",
    "destination",
    "namespace",
    "records_delivered",
    "scenarios_requested",
    "elapsed_seconds"
  ],
  "type": "object"
}

get_source_stream_json_schema

Hints: read-only · idempotent

List all properties for a specific stream in a source connector.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

Parameters

Name Type Required Default Description
source_connector_name string yes The name of the source connector.
stream_name string yes The name of the stream.
config object | string | null no null The configuration for the source connector as a dict or JSON string.
config_file string | string | null no null Path to a YAML or JSON file containing the source connector config.
config_secret_name string | null no null The name of the secret containing the configuration.
override_execution_mode enum("docker", "python", "yaml", "auto") no "auto" Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).
manifest_path string | string | null no null Path to a local YAML manifest file for declarative connectors.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_connector_name": {
      "description": "The name of the source connector.",
      "type": "string"
    },
    "stream_name": {
      "description": "The name of the stream.",
      "type": "string"
    },
    "config": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The configuration for the source connector as a dict or JSON string."
    },
    "config_file": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a YAML or JSON file containing the source connector config."
    },
    "config_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The name of the secret containing the configuration."
    },
    "override_execution_mode": {
      "default": "auto",
      "description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
      "enum": [
        "docker",
        "python",
        "yaml",
        "auto"
      ],
      "type": "string"
    },
    "manifest_path": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a local YAML manifest file for declarative connectors."
    }
  },
  "required": [
    "source_connector_name",
    "stream_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "additionalProperties": true,
  "type": "object"
}

get_stream_previews

Hints: read-only

Get sample records (previews) from streams in a source connector.

This operation requires a valid configuration, including any required secrets.
Returns a dictionary mapping stream names to lists of sample records, or an error
message string if an error occurred for that stream.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

Parameters

Name Type Required Default Description
source_name string yes The name of the source connector.
config object | string | null no null The configuration for the source connector as a dict or JSON string.
config_file string | string | null no null Path to a YAML or JSON file containing the source connector config.
config_secret_name string | null no null The name of the secret containing the configuration.
streams array<string> | string | null no null The streams to get previews for. Use '*' for all streams, or None for selected streams.
limit integer no 10 The maximum number of sample records to return per stream.
override_execution_mode enum("docker", "python", "yaml", "auto") no "auto" Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).
manifest_path string | string | null no null Path to a local YAML manifest file for declarative connectors.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_name": {
      "description": "The name of the source connector.",
      "type": "string"
    },
    "config": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The configuration for the source connector as a dict or JSON string."
    },
    "config_file": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a YAML or JSON file containing the source connector config."
    },
    "config_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The name of the secret containing the configuration."
    },
    "streams": {
      "anyOf": [
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The streams to get previews for. Use '*' for all streams, or None for selected streams."
    },
    "limit": {
      "default": 10,
      "description": "The maximum number of sample records to return per stream.",
      "type": "integer"
    },
    "override_execution_mode": {
      "default": "auto",
      "description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
      "enum": [
        "docker",
        "python",
        "yaml",
        "auto"
      ],
      "type": "string"
    },
    "manifest_path": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a local YAML manifest file for declarative connectors."
    }
  },
  "required": [
    "source_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "additionalProperties": {
    "anyOf": [
      {
        "items": {
          "additionalProperties": true,
          "type": "object"
        },
        "type": "array"
      },
      {
        "type": "string"
      }
    ]
  },
  "type": "object"
}

list_cached_streams

Hints: read-only · idempotent

List all streams available in the default DuckDB cache.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

Parameters

_No parameters._

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {},
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "description": "Class to hold information about a cached dataset.",
        "properties": {
          "stream_name": {
            "type": "string"
          },
          "table_name": {
            "type": "string"
          },
          "schema_name": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null
          }
        },
        "required": [
          "stream_name",
          "table_name"
        ],
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

list_connector_config_secrets

Hints: read-only · idempotent

List all config_secret_name options that are known for the given connector.

This can be used to find out which already-created config secret names are available for a given connector. The return value is a list of secret names, but it will not return the actual secret values.

Parameters

Name Type Required Default Description
connector_name string yes The name of the connector.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_name": {
      "description": "The name of the connector.",
      "type": "string"
    }
  },
  "required": [
    "connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "type": "string"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

list_dotenv_secrets

Hints: read-only · idempotent

List all environment variable names declared within declared .env files.

This returns a dictionary mapping the .env file name to a list of environment
variable names. The values of the environment variables are not returned.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

Parameters

_No parameters._

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {},
  "type": "object"
}

Show output JSON schema

{
  "additionalProperties": {
    "items": {
      "type": "string"
    },
    "type": "array"
  },
  "type": "object"
}

list_source_streams

Hints: read-only · idempotent

List all streams available in a source connector.

This operation (generally) requires a valid configuration, including any required secrets.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

Parameters

Name Type Required Default Description
source_connector_name string yes The name of the source connector.
config object | string | null no null The configuration for the source connector as a dict or JSON string.
config_file string | string | null no null Path to a YAML or JSON file containing the source connector config.
config_secret_name string | null no null The name of the secret containing the configuration.
override_execution_mode enum("docker", "python", "yaml", "auto") no "auto" Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).
manifest_path string | string | null no null Path to a local YAML manifest file for declarative connectors.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_connector_name": {
      "description": "The name of the source connector.",
      "type": "string"
    },
    "config": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The configuration for the source connector as a dict or JSON string."
    },
    "config_file": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a YAML or JSON file containing the source connector config."
    },
    "config_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The name of the secret containing the configuration."
    },
    "override_execution_mode": {
      "default": "auto",
      "description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
      "enum": [
        "docker",
        "python",
        "yaml",
        "auto"
      ],
      "type": "string"
    },
    "manifest_path": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a local YAML manifest file for declarative connectors."
    }
  },
  "required": [
    "source_connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "type": "string"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

read_source_stream_records

Hints: read-only

Get records from a source connector.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

Parameters

Name Type Required Default Description
source_connector_name string yes The name of the source connector.
config object | string | null no null The configuration for the source connector as a dict or JSON string.
config_file string | string | null no null Path to a YAML or JSON file containing the source connector config.
config_secret_name string | null no null The name of the secret containing the configuration.
stream_name string yes The name of the stream to read records from.
max_records integer no 1000 The maximum number of records to read.
override_execution_mode enum("docker", "python", "yaml", "auto") no "auto" Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).
manifest_path string | string | null no null Path to a local YAML manifest file for declarative connectors.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_connector_name": {
      "description": "The name of the source connector.",
      "type": "string"
    },
    "config": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The configuration for the source connector as a dict or JSON string."
    },
    "config_file": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a YAML or JSON file containing the source connector config."
    },
    "config_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The name of the secret containing the configuration."
    },
    "stream_name": {
      "description": "The name of the stream to read records from.",
      "type": "string"
    },
    "max_records": {
      "default": 1000,
      "description": "The maximum number of records to read.",
      "type": "integer"
    },
    "override_execution_mode": {
      "default": "auto",
      "description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
      "enum": [
        "docker",
        "python",
        "yaml",
        "auto"
      ],
      "type": "string"
    },
    "manifest_path": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a local YAML manifest file for declarative connectors."
    }
  },
  "required": [
    "source_connector_name",
    "stream_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "anyOf": [
        {
          "items": {
            "additionalProperties": true,
            "type": "object"
          },
          "type": "array"
        },
        {
          "type": "string"
        }
      ]
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

run_sql_query

Hints: read-only · idempotent

Run a SQL query against the default cache.

The dialect of SQL should match the dialect of the default cache.
Use `describe_default_cache` to see the cache type.

For DuckDB-type caches:
- Use `SHOW TABLES` to list all tables.
- Use `DESCRIBE <table_name>` to get the schema of a specific table

For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

Parameters

Name Type Required Default Description
sql_query string yes The SQL query to execute.
max_records integer no 1000 Maximum number of records to return.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "sql_query": {
      "description": "The SQL query to execute.",
      "type": "string"
    },
    "max_records": {
      "default": 1000,
      "description": "Maximum number of records to return.",
      "type": "integer"
    }
  },
  "required": [
    "sql_query"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

sync_source_to_cache

Run a sync from a source connector to the default DuckDB cache.

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

Parameters

Name Type Required Default Description
source_connector_name string yes The name of the source connector.
config object | string | null no null The configuration for the source connector as a dict or JSON string.
config_file string | string | null no null Path to a YAML or JSON file containing the source connector config.
config_secret_name string | null no null The name of the secret containing the configuration.
streams array<string> | string no "suggested" The streams to sync.
override_execution_mode enum("docker", "python", "yaml", "auto") no "auto" Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).
manifest_path string | string | null no null Path to a local YAML manifest file for declarative connectors.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_connector_name": {
      "description": "The name of the source connector.",
      "type": "string"
    },
    "config": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The configuration for the source connector as a dict or JSON string."
    },
    "config_file": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a YAML or JSON file containing the source connector config."
    },
    "config_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The name of the secret containing the configuration."
    },
    "streams": {
      "anyOf": [
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        {
          "type": "string"
        }
      ],
      "default": "suggested",
      "description": "The streams to sync."
    },
    "override_execution_mode": {
      "default": "auto",
      "description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
      "enum": [
        "docker",
        "python",
        "yaml",
        "auto"
      ],
      "type": "string"
    },
    "manifest_path": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a local YAML manifest file for declarative connectors."
    }
  },
  "required": [
    "source_connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

validate_connector_config

Hints: read-only · idempotent

Validate a connector configuration.

Returns a tuple of (is_valid: bool, message: str).

You can provide config as JSON or a Path to a YAML/JSON file. If a dict is provided, it must not contain hardcoded secrets. Instead, secrets should be provided using environment variables, and the config should reference them using the format secret_reference::ENV_VAR_NAME.

You can also provide a config_secret_name to use a specific secret name for the configuration. This is useful if you want to validate a configuration that is stored in a secrets manager.

If config_secret_name is provided, it should point to a string that contains valid JSON or YAML.

If both config and config_secret_name are provided, the config will be loaded first and then the referenced secret config will be layered on top of the non-secret config.

For declarative connectors, you can provide a manifest_path to specify a local YAML manifest file instead of using the registry version. This is useful for testing custom or locally-developed connector manifests.

Parameters

Name Type Required Default Description
connector_name string yes The name of the connector to validate.
config object | string | null no null The configuration for the connector as a dict object or JSON string.
config_file string | string | null no null Path to a YAML or JSON file containing the connector configuration.
config_secret_name string | null no null The name of the secret containing the configuration.
override_execution_mode enum("docker", "python", "yaml", "auto") no "auto" Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).
manifest_path string | string | null no null Path to a local YAML manifest file for declarative connectors.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_name": {
      "description": "The name of the connector to validate.",
      "type": "string"
    },
    "config": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The configuration for the connector as a dict object or JSON string."
    },
    "config_file": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a YAML or JSON file containing the connector configuration."
    },
    "config_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The name of the secret containing the configuration."
    },
    "override_execution_mode": {
      "default": "auto",
      "description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
      "enum": [
        "docker",
        "python",
        "yaml",
        "auto"
      ],
      "type": "string"
    },
    "manifest_path": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Path to a local YAML manifest file for declarative connectors."
    }
  },
  "required": [
    "connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "maxItems": 2,
      "minItems": 2,
      "prefixItems": [
        {
          "type": "boolean"
        },
        {
          "type": "string"
        }
      ],
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Local MCP operations.
  3
  4.. include:: ../../docs/mcp-generated/local.md
  5"""
  6
  7# No public Python API — MCP primitives are registered via decorators and
  8# documented via the generated Markdown include above. Setting `__all__` to an
  9# empty list tells pdoc (and other doc tools) not to surface the individual
 10# tool / helper definitions as a redundant "API Documentation" list.
 11__all__: list[str] = []
 12
 13import sys
 14import traceback
 15from itertools import islice
 16from pathlib import Path
 17from typing import TYPE_CHECKING, Annotated, Any, Literal
 18
 19from fastmcp import FastMCP
 20from fastmcp_extensions import mcp_tool, register_mcp_tools
 21from pydantic import BaseModel, Field
 22
 23from airbyte import get_source
 24from airbyte._util.destination_smoke_tests import (
 25    DestinationSmokeTestResult,
 26    run_destination_smoke_test,
 27)
 28from airbyte._util.meta import is_docker_installed
 29from airbyte.caches.util import get_default_cache
 30from airbyte.destinations.util import get_destination
 31from airbyte.mcp._arg_resolvers import resolve_connector_config, resolve_list_of_strings
 32from airbyte.registry import get_connector_metadata
 33from airbyte.secrets.config import _get_secret_sources
 34from airbyte.secrets.env_vars import DotenvSecretManager
 35from airbyte.secrets.google_gsm import GoogleGSMSecretManager
 36from airbyte.sources.base import Source
 37
 38
 39if TYPE_CHECKING:
 40    from airbyte.caches.duckdb import DuckDBCache
 41
 42
 43_CONFIG_HELP = """
 44You can provide `config` as JSON or a Path to a YAML/JSON file.
 45If a `dict` is provided, it must not contain hardcoded secrets.
 46Instead, secrets should be provided using environment variables,
 47and the config should reference them using the format
 48`secret_reference::ENV_VAR_NAME`.
 49
 50You can also provide a `config_secret_name` to use a specific
 51secret name for the configuration. This is useful if you want to
 52validate a configuration that is stored in a secrets manager.
 53
 54If `config_secret_name` is provided, it should point to a string
 55that contains valid JSON or YAML.
 56
 57If both `config` and `config_secret_name` are provided, the
 58`config` will be loaded first and then the referenced secret config
 59will be layered on top of the non-secret config.
 60
 61For declarative connectors, you can provide a `manifest_path` to
 62specify a local YAML manifest file instead of using the registry
 63version. This is useful for testing custom or locally-developed
 64connector manifests.
 65"""
 66
 67
 68def _get_mcp_source(
 69    connector_name: str,
 70    override_execution_mode: Literal["auto", "docker", "python", "yaml"] = "auto",
 71    *,
 72    install_if_missing: bool = True,
 73    manifest_path: str | Path | None,
 74) -> Source:
 75    """Get the MCP source for a connector."""
 76    if manifest_path:
 77        override_execution_mode = "yaml"
 78    elif override_execution_mode == "auto" and is_docker_installed():
 79        override_execution_mode = "docker"
 80
 81    source: Source
 82    if override_execution_mode == "auto":
 83        # Use defaults with no overrides
 84        source = get_source(
 85            connector_name,
 86            install_if_missing=False,
 87            source_manifest=manifest_path or None,
 88        )
 89    elif override_execution_mode == "python":
 90        source = get_source(
 91            connector_name,
 92            use_python=True,
 93            install_if_missing=False,
 94            source_manifest=manifest_path or None,
 95        )
 96    elif override_execution_mode == "docker":
 97        source = get_source(
 98            connector_name,
 99            docker_image=True,
100            install_if_missing=False,
101            source_manifest=manifest_path or None,
102        )
103    elif override_execution_mode == "yaml":
104        source = get_source(
105            connector_name,
106            source_manifest=manifest_path or True,
107            install_if_missing=False,
108        )
109    else:
110        raise ValueError(
111            f"Unknown execution method: {override_execution_mode}. "
112            "Expected one of: ['auto', 'docker', 'python', 'yaml']."
113        )
114
115    # Ensure installed:
116    if install_if_missing:
117        source.executor.ensure_installation()
118
119    return source
120
121
122@mcp_tool(
123    read_only=True,
124    idempotent=True,
125    requires_client_filesystem=True,
126    extra_help_text=_CONFIG_HELP,
127)
128def validate_connector_config(
129    connector_name: Annotated[
130        str,
131        Field(description="The name of the connector to validate."),
132    ],
133    config: Annotated[
134        dict | str | None,
135        Field(
136            description="The configuration for the connector as a dict object or JSON string.",
137            default=None,
138        ),
139    ],
140    config_file: Annotated[
141        str | Path | None,
142        Field(
143            description="Path to a YAML or JSON file containing the connector configuration.",
144            default=None,
145        ),
146    ],
147    config_secret_name: Annotated[
148        str | None,
149        Field(
150            description="The name of the secret containing the configuration.",
151            default=None,
152        ),
153    ],
154    override_execution_mode: Annotated[
155        Literal["docker", "python", "yaml", "auto"],
156        Field(
157            description="Optionally override the execution method to use for the connector. "
158            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
159            default="auto",
160        ),
161    ],
162    manifest_path: Annotated[
163        str | Path | None,
164        Field(
165            description="Path to a local YAML manifest file for declarative connectors.",
166            default=None,
167        ),
168    ],
169) -> tuple[bool, str]:
170    """Validate a connector configuration.
171
172    Returns a tuple of (is_valid: bool, message: str).
173    """
174    try:
175        source: Source = _get_mcp_source(
176            connector_name,
177            override_execution_mode=override_execution_mode,
178            manifest_path=manifest_path,
179        )
180    except Exception as ex:
181        return False, f"Failed to get connector '{connector_name}': {ex}"
182
183    try:
184        config_dict = resolve_connector_config(
185            config=config,
186            config_file=config_file,
187            config_secret_name=config_secret_name,
188            config_spec_jsonschema=source.config_spec,
189        )
190        source.set_config(config_dict)
191    except Exception as ex:
192        return False, f"Failed to resolve configuration for {connector_name}: {ex}"
193
194    try:
195        source.check()
196    except Exception as ex:
197        return False, f"Configuration for {connector_name} is invalid: {ex}"
198
199    return True, f"Configuration for {connector_name} is valid!"
200
201
202@mcp_tool(
203    read_only=True,
204    idempotent=True,
205    requires_client_filesystem=True,
206)
207def list_connector_config_secrets(
208    connector_name: Annotated[
209        str,
210        Field(description="The name of the connector."),
211    ],
212) -> list[str]:
213    """List all `config_secret_name` options that are known for the given connector.
214
215    This can be used to find out which already-created config secret names are available
216    for a given connector. The return value is a list of secret names, but it will not
217    return the actual secret values.
218    """
219    secrets_names: list[str] = []
220    for secrets_mgr in _get_secret_sources():
221        if isinstance(secrets_mgr, GoogleGSMSecretManager):
222            secrets_names.extend(
223                [
224                    secret_handle.secret_name.split("/")[-1]
225                    for secret_handle in secrets_mgr.fetch_connector_secrets(connector_name)
226                ]
227            )
228
229    return secrets_names
230
231
232@mcp_tool(
233    read_only=True,
234    idempotent=True,
235    requires_client_filesystem=True,
236    extra_help_text=_CONFIG_HELP,
237)
238def list_dotenv_secrets() -> dict[str, list[str]]:
239    """List all environment variable names declared within declared .env files.
240
241    This returns a dictionary mapping the .env file name to a list of environment
242    variable names. The values of the environment variables are not returned.
243    """
244    result: dict[str, list[str]] = {}
245    for secrets_mgr in _get_secret_sources():
246        if isinstance(secrets_mgr, DotenvSecretManager) and secrets_mgr.dotenv_path:
247            result[str(secrets_mgr.dotenv_path.resolve())] = secrets_mgr.list_secrets_names()
248
249    return result
250
251
252@mcp_tool(
253    read_only=True,
254    idempotent=True,
255    requires_client_filesystem=True,
256    extra_help_text=_CONFIG_HELP,
257)
258def list_source_streams(
259    source_connector_name: Annotated[
260        str,
261        Field(description="The name of the source connector."),
262    ],
263    config: Annotated[
264        dict | str | None,
265        Field(
266            description="The configuration for the source connector as a dict or JSON string.",
267            default=None,
268        ),
269    ],
270    config_file: Annotated[
271        str | Path | None,
272        Field(
273            description="Path to a YAML or JSON file containing the source connector config.",
274            default=None,
275        ),
276    ],
277    config_secret_name: Annotated[
278        str | None,
279        Field(
280            description="The name of the secret containing the configuration.",
281            default=None,
282        ),
283    ],
284    override_execution_mode: Annotated[
285        Literal["docker", "python", "yaml", "auto"],
286        Field(
287            description="Optionally override the execution method to use for the connector. "
288            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
289            default="auto",
290        ),
291    ],
292    manifest_path: Annotated[
293        str | Path | None,
294        Field(
295            description="Path to a local YAML manifest file for declarative connectors.",
296            default=None,
297        ),
298    ],
299) -> list[str]:
300    """List all streams available in a source connector.
301
302    This operation (generally) requires a valid configuration, including any required secrets.
303    """
304    source: Source = _get_mcp_source(
305        connector_name=source_connector_name,
306        override_execution_mode=override_execution_mode,
307        manifest_path=manifest_path,
308    )
309    config_dict = resolve_connector_config(
310        config=config,
311        config_file=config_file,
312        config_secret_name=config_secret_name,
313        config_spec_jsonschema=source.config_spec,
314    )
315    source.set_config(config_dict)
316    return source.get_available_streams()
317
318
319@mcp_tool(
320    read_only=True,
321    idempotent=True,
322    requires_client_filesystem=True,
323    extra_help_text=_CONFIG_HELP,
324)
325def get_source_stream_json_schema(
326    source_connector_name: Annotated[
327        str,
328        Field(description="The name of the source connector."),
329    ],
330    stream_name: Annotated[
331        str,
332        Field(description="The name of the stream."),
333    ],
334    config: Annotated[
335        dict | str | None,
336        Field(
337            description="The configuration for the source connector as a dict or JSON string.",
338            default=None,
339        ),
340    ],
341    config_file: Annotated[
342        str | Path | None,
343        Field(
344            description="Path to a YAML or JSON file containing the source connector config.",
345            default=None,
346        ),
347    ],
348    config_secret_name: Annotated[
349        str | None,
350        Field(
351            description="The name of the secret containing the configuration.",
352            default=None,
353        ),
354    ],
355    override_execution_mode: Annotated[
356        Literal["docker", "python", "yaml", "auto"],
357        Field(
358            description="Optionally override the execution method to use for the connector. "
359            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
360            default="auto",
361        ),
362    ],
363    manifest_path: Annotated[
364        str | Path | None,
365        Field(
366            description="Path to a local YAML manifest file for declarative connectors.",
367            default=None,
368        ),
369    ],
370) -> dict[str, Any]:
371    """List all properties for a specific stream in a source connector."""
372    source: Source = _get_mcp_source(
373        connector_name=source_connector_name,
374        override_execution_mode=override_execution_mode,
375        manifest_path=manifest_path,
376    )
377    config_dict = resolve_connector_config(
378        config=config,
379        config_file=config_file,
380        config_secret_name=config_secret_name,
381        config_spec_jsonschema=source.config_spec,
382    )
383    source.set_config(config_dict)
384    return source.get_stream_json_schema(stream_name=stream_name)
385
386
387@mcp_tool(
388    read_only=True,
389    requires_client_filesystem=True,
390    extra_help_text=_CONFIG_HELP,
391)
392def read_source_stream_records(
393    source_connector_name: Annotated[
394        str,
395        Field(description="The name of the source connector."),
396    ],
397    config: Annotated[
398        dict | str | None,
399        Field(
400            description="The configuration for the source connector as a dict or JSON string.",
401            default=None,
402        ),
403    ],
404    config_file: Annotated[
405        str | Path | None,
406        Field(
407            description="Path to a YAML or JSON file containing the source connector config.",
408            default=None,
409        ),
410    ],
411    config_secret_name: Annotated[
412        str | None,
413        Field(
414            description="The name of the secret containing the configuration.",
415            default=None,
416        ),
417    ],
418    *,
419    stream_name: Annotated[
420        str,
421        Field(description="The name of the stream to read records from."),
422    ],
423    max_records: Annotated[
424        int,
425        Field(
426            description="The maximum number of records to read.",
427            default=1000,
428        ),
429    ],
430    override_execution_mode: Annotated[
431        Literal["docker", "python", "yaml", "auto"],
432        Field(
433            description="Optionally override the execution method to use for the connector. "
434            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
435            default="auto",
436        ),
437    ],
438    manifest_path: Annotated[
439        str | Path | None,
440        Field(
441            description="Path to a local YAML manifest file for declarative connectors.",
442            default=None,
443        ),
444    ],
445) -> list[dict[str, Any]] | str:
446    """Get records from a source connector."""
447    try:
448        source: Source = _get_mcp_source(
449            connector_name=source_connector_name,
450            override_execution_mode=override_execution_mode,
451            manifest_path=manifest_path,
452        )
453        config_dict = resolve_connector_config(
454            config=config,
455            config_file=config_file,
456            config_secret_name=config_secret_name,
457            config_spec_jsonschema=source.config_spec,
458        )
459        source.set_config(config_dict)
460        # First we get a generator for the records in the specified stream.
461        record_generator = source.get_records(stream_name)
462        # Next we load a limited number of records from the generator into our list.
463        records: list[dict[str, Any]] = list(islice(record_generator, max_records))
464
465        print(f"Retrieved {len(records)} records from stream '{stream_name}'", sys.stderr)
466
467    except Exception as ex:
468        tb_str = traceback.format_exc()
469        # If any error occurs, we print the error message to stderr and return an empty list.
470        return (
471            f"Error reading records from source '{source_connector_name}': {ex!r}, {ex!s}\n{tb_str}"
472        )
473
474    else:
475        return records
476
477
478@mcp_tool(
479    read_only=True,
480    requires_client_filesystem=True,
481    extra_help_text=_CONFIG_HELP,
482)
483def get_stream_previews(
484    source_name: Annotated[
485        str,
486        Field(description="The name of the source connector."),
487    ],
488    config: Annotated[
489        dict | str | None,
490        Field(
491            description="The configuration for the source connector as a dict or JSON string.",
492            default=None,
493        ),
494    ],
495    config_file: Annotated[
496        str | Path | None,
497        Field(
498            description="Path to a YAML or JSON file containing the source connector config.",
499            default=None,
500        ),
501    ],
502    config_secret_name: Annotated[
503        str | None,
504        Field(
505            description="The name of the secret containing the configuration.",
506            default=None,
507        ),
508    ],
509    streams: Annotated[
510        list[str] | str | None,
511        Field(
512            description=(
513                "The streams to get previews for. "
514                "Use '*' for all streams, or None for selected streams."
515            ),
516            default=None,
517        ),
518    ],
519    limit: Annotated[
520        int,
521        Field(
522            description="The maximum number of sample records to return per stream.",
523            default=10,
524        ),
525    ],
526    override_execution_mode: Annotated[
527        Literal["docker", "python", "yaml", "auto"],
528        Field(
529            description="Optionally override the execution method to use for the connector. "
530            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
531            default="auto",
532        ),
533    ],
534    manifest_path: Annotated[
535        str | Path | None,
536        Field(
537            description="Path to a local YAML manifest file for declarative connectors.",
538            default=None,
539        ),
540    ],
541) -> dict[str, list[dict[str, Any]] | str]:
542    """Get sample records (previews) from streams in a source connector.
543
544    This operation requires a valid configuration, including any required secrets.
545    Returns a dictionary mapping stream names to lists of sample records, or an error
546    message string if an error occurred for that stream.
547    """
548    source: Source = _get_mcp_source(
549        connector_name=source_name,
550        override_execution_mode=override_execution_mode,
551        manifest_path=manifest_path,
552    )
553
554    config_dict = resolve_connector_config(
555        config=config,
556        config_file=config_file,
557        config_secret_name=config_secret_name,
558        config_spec_jsonschema=source.config_spec,
559    )
560    source.set_config(config_dict)
561
562    streams_param: list[str] | Literal["*"] | None = resolve_list_of_strings(
563        streams
564    )  # pyrefly: ignore[no-matching-overload]
565    if streams_param and len(streams_param) == 1 and streams_param[0] == "*":
566        streams_param = "*"
567
568    try:
569        samples_result = source.get_samples(
570            streams=streams_param,
571            limit=limit,
572            on_error="ignore",
573        )
574    except Exception as ex:
575        tb_str = traceback.format_exc()
576        return {
577            "ERROR": f"Error getting stream previews from source '{source_name}': "
578            f"{ex!r}, {ex!s}\n{tb_str}"
579        }
580
581    result: dict[str, list[dict[str, Any]] | str] = {}
582    for stream_name, dataset in samples_result.items():
583        if dataset is None:
584            result[stream_name] = f"Could not retrieve stream samples for stream '{stream_name}'"
585        else:
586            result[stream_name] = list(dataset)
587
588    return result
589
590
591@mcp_tool(
592    destructive=False,
593    requires_client_filesystem=True,
594    extra_help_text=_CONFIG_HELP,
595)
596def sync_source_to_cache(
597    source_connector_name: Annotated[
598        str,
599        Field(description="The name of the source connector."),
600    ],
601    config: Annotated[
602        dict | str | None,
603        Field(
604            description="The configuration for the source connector as a dict or JSON string.",
605            default=None,
606        ),
607    ],
608    config_file: Annotated[
609        str | Path | None,
610        Field(
611            description="Path to a YAML or JSON file containing the source connector config.",
612            default=None,
613        ),
614    ],
615    config_secret_name: Annotated[
616        str | None,
617        Field(
618            description="The name of the secret containing the configuration.",
619            default=None,
620        ),
621    ],
622    streams: Annotated[
623        list[str] | str,
624        Field(
625            description="The streams to sync.",
626            default="suggested",
627        ),
628    ],
629    override_execution_mode: Annotated[
630        Literal["docker", "python", "yaml", "auto"],
631        Field(
632            description="Optionally override the execution method to use for the connector. "
633            "This parameter is ignored if manifest_path is provided (yaml mode will be used).",
634            default="auto",
635        ),
636    ],
637    manifest_path: Annotated[
638        str | Path | None,
639        Field(
640            description="Path to a local YAML manifest file for declarative connectors.",
641            default=None,
642        ),
643    ],
644) -> str:
645    """Run a sync from a source connector to the default DuckDB cache."""
646    source: Source = _get_mcp_source(
647        connector_name=source_connector_name,
648        override_execution_mode=override_execution_mode,
649        manifest_path=manifest_path,
650    )
651    config_dict = resolve_connector_config(
652        config=config,
653        config_file=config_file,
654        config_secret_name=config_secret_name,
655        config_spec_jsonschema=source.config_spec,
656    )
657    source.set_config(config_dict)
658    cache = get_default_cache()
659
660    streams = resolve_list_of_strings(streams)
661    if streams and len(streams) == 1 and streams[0] in {"*", "suggested"}:
662        # Float '*' and 'suggested' to the top-level for special processing:
663        streams = streams[0]
664
665    if isinstance(streams, str) and streams == "suggested":
666        streams = "*"  # Default to all streams if 'suggested' is not otherwise specified.
667        try:
668            metadata = get_connector_metadata(
669                source_connector_name,
670            )
671        except Exception:
672            streams = "*"  # Fallback to all streams if suggested streams fail.
673        else:
674            if metadata is not None:
675                streams = metadata.suggested_streams or "*"
676
677    if isinstance(streams, str) and streams != "*":
678        streams = [streams]  # Ensure streams is a list
679
680    source.read(
681        cache=cache,
682        streams=streams,
683    )
684    del cache  # Ensure the cache is closed properly
685
686    summary: str = f"Sync completed for '{source_connector_name}'!\n\n"
687    summary += "Data written to default DuckDB cache\n"
688    return summary
689
690
691class CachedDatasetInfo(BaseModel):
692    """Class to hold information about a cached dataset."""
693
694    stream_name: str
695    """The name of the stream in the cache."""
696    table_name: str
697    schema_name: str | None = None
698
699
700@mcp_tool(
701    read_only=True,
702    idempotent=True,
703    requires_client_filesystem=True,
704    extra_help_text=_CONFIG_HELP,
705)
706def list_cached_streams() -> list[CachedDatasetInfo]:
707    """List all streams available in the default DuckDB cache."""
708    cache: DuckDBCache = get_default_cache()
709    result = [
710        CachedDatasetInfo(
711            stream_name=stream_name,
712            table_name=(cache.table_prefix or "") + stream_name,
713            schema_name=cache.schema_name,
714        )
715        for stream_name in cache.streams
716    ]
717    del cache  # Ensure the cache is closed properly
718    return result
719
720
721@mcp_tool(
722    read_only=True,
723    idempotent=True,
724    requires_client_filesystem=True,
725    extra_help_text=_CONFIG_HELP,
726)
727def describe_default_cache() -> dict[str, Any]:
728    """Describe the currently configured default cache."""
729    cache = get_default_cache()
730    return {
731        "cache_type": type(cache).__name__,
732        "cache_dir": str(cache.cache_dir),
733        "cache_db_path": str(Path(cache.db_path).absolute()),
734        "cached_streams": list(cache.streams.keys()),
735    }
736
737
738def _is_safe_sql(sql_query: str) -> bool:
739    """Check if a SQL query is safe to execute.
740
741    For security reasons, we only allow read-only operations like SELECT, DESCRIBE, and SHOW.
742    Multi-statement queries (containing semicolons) are also disallowed for security.
743
744    Note: SQLAlchemy will also validate downstream, but this is a first-pass check.
745
746    Args:
747        sql_query: The SQL query to check
748
749    Returns:
750        True if the query is safe to execute, False otherwise
751    """
752    # Remove leading/trailing whitespace and convert to uppercase for checking
753    normalized_query = sql_query.strip().upper()
754
755    # Disallow multi-statement queries (containing semicolons)
756    # Note: We check the original query to catch semicolons anywhere, including in comments
757    if ";" in sql_query:
758        return False
759
760    # List of allowed SQL statement prefixes (read-only operations)
761    allowed_prefixes = (
762        "SELECT",
763        "DESCRIBE",
764        "DESC",  # Short form of DESCRIBE
765        "SHOW",
766        "EXPLAIN",  # Also safe - shows query execution plan
767    )
768
769    # Check if the query starts with any allowed prefix
770    return any(normalized_query.startswith(prefix) for prefix in allowed_prefixes)
771
772
773@mcp_tool(
774    read_only=True,
775    idempotent=True,
776    requires_client_filesystem=True,
777    extra_help_text=_CONFIG_HELP,
778)
779def run_sql_query(
780    sql_query: Annotated[
781        str,
782        Field(description="The SQL query to execute."),
783    ],
784    max_records: Annotated[
785        int,
786        Field(
787            description="Maximum number of records to return.",
788            default=1000,
789        ),
790    ],
791) -> list[dict[str, Any]]:
792    """Run a SQL query against the default cache.
793
794    The dialect of SQL should match the dialect of the default cache.
795    Use `describe_default_cache` to see the cache type.
796
797    For DuckDB-type caches:
798    - Use `SHOW TABLES` to list all tables.
799    - Use `DESCRIBE <table_name>` to get the schema of a specific table
800
801    For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN.
802    """
803    # Check if the query is safe to execute
804    if not _is_safe_sql(sql_query):
805        return [
806            {
807                "ERROR": "Unsafe SQL query detected. Only read-only operations are allowed: "
808                "SELECT, DESCRIBE, SHOW, EXPLAIN",
809                "SQL_QUERY": sql_query,
810            }
811        ]
812
813    cache: DuckDBCache = get_default_cache()
814    try:
815        return cache.run_sql_query(
816            sql_query,
817            max_records=max_records,
818        )
819    except Exception as ex:
820        tb_str = traceback.format_exc()
821        return [
822            {
823                "ERROR": f"Error running SQL query: {ex!r}, {ex!s}",
824                "TRACEBACK": tb_str,
825                "SQL_QUERY": sql_query,
826            }
827        ]
828    finally:
829        del cache  # Ensure the cache is closed properly
830
831
832@mcp_tool(
833    destructive=True,
834    requires_client_filesystem=True,
835)
836def destination_smoke_test(  # noqa: PLR0913, PLR0917
837    destination_connector_name: Annotated[
838        str,
839        Field(
840            description=(
841                "The name of the destination connector to test "
842                "(e.g. 'destination-snowflake', 'destination-motherduck')."
843            ),
844        ),
845    ],
846    config: Annotated[
847        dict | str | None,
848        Field(
849            description=(
850                "The destination configuration as a dict object or JSON string. "
851                "Must not contain hardcoded secrets; use secret_reference::ENV_VAR_NAME instead."
852            ),
853            default=None,
854        ),
855    ],
856    config_file: Annotated[
857        str | Path | None,
858        Field(
859            description="Path to a YAML or JSON file containing the destination configuration.",
860            default=None,
861        ),
862    ],
863    config_secret_name: Annotated[
864        str | None,
865        Field(
866            description="The name of the secret containing the destination configuration.",
867            default=None,
868        ),
869    ],
870    scenarios: Annotated[
871        list[str] | str,
872        Field(
873            description=(
874                "Which scenarios to run. Use 'fast' (default) for all fast predefined "
875                "scenarios (excludes large_batch_stream), 'all' for every predefined "
876                "scenario including large batch, or provide a list of scenario names "
877                "or a comma-separated string."
878            ),
879            default="fast",
880        ),
881    ],
882    custom_scenarios: Annotated[
883        list[dict[str, Any]] | None,
884        Field(
885            description=(
886                "Additional custom test scenarios to inject. Each scenario should define "
887                "'name', 'json_schema', and optionally 'records' and 'primary_key'. "
888                "These are unioned with the predefined scenarios."
889            ),
890            default=None,
891        ),
892    ],
893    docker_image: Annotated[
894        str | None,
895        Field(
896            description=(
897                "Optional Docker image override for the destination connector "
898                "(e.g. 'airbyte/destination-snowflake:3.14.0')."
899            ),
900            default=None,
901        ),
902    ],
903    namespace_suffix: Annotated[
904        str | None,
905        Field(
906            description=(
907                "Optional suffix appended to the auto-generated namespace. "
908                "Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). "
909                "Use this to distinguish concurrent runs."
910            ),
911            default=None,
912        ),
913    ],
914    reuse_namespace: Annotated[
915        str | None,
916        Field(
917            description=(
918                "Exact namespace to reuse from a previous run. "
919                "When set, no new namespace is generated. "
920                "Useful for running a second test against an already-populated namespace."
921            ),
922            default=None,
923        ),
924    ],
925    skip_preflight: Annotated[
926        bool,
927        Field(
928            description=(
929                "Skip the automatic preflight check that runs basic_types before "
930                "the requested scenarios. Set to true when you expect basic_types "
931                "itself to fail or want to save time on repeated runs."
932            ),
933            default=False,
934        ),
935    ],
936) -> DestinationSmokeTestResult:
937    """Run smoke tests against a destination connector.
938
939    Sends synthetic test data from the smoke test source to the specified
940    destination and reports success or failure. The smoke test source generates
941    data across predefined scenarios covering common destination failure patterns:
942    type variations, null handling, naming edge cases, schema variations, and
943    batch sizes.
944
945    When the destination has a compatible cache implementation (DuckDB,
946    Postgres, Snowflake, BigQuery, MotherDuck), readback introspection is
947    automatically performed after a successful write. The readback produces
948    stats on the written data: table row counts, column names/types, and
949    per-column null/non-null counts. Results are included in the response
950    as `table_statistics` and `tables_not_found`.
951    """
952    # Resolve destination config
953    config_dict = resolve_connector_config(
954        config=config,
955        config_file=config_file,
956        config_secret_name=config_secret_name,
957    )
958
959    # Set up destination
960    destination_kwargs: dict[str, Any] = {
961        "name": destination_connector_name,
962        "config": config_dict,
963    }
964    if docker_image:
965        destination_kwargs["docker_image"] = docker_image
966    elif is_docker_installed():
967        destination_kwargs["docker_image"] = True
968
969    destination_obj = get_destination(**destination_kwargs)
970
971    # Resolve scenarios for the shared helper
972    resolved_scenarios: str | list[str]
973    if isinstance(scenarios, str):
974        resolved_scenarios = scenarios
975    else:
976        resolved_scenarios = resolve_list_of_strings(scenarios) or "fast"
977
978    return run_destination_smoke_test(
979        destination=destination_obj,
980        scenarios=resolved_scenarios,
981        namespace_suffix=namespace_suffix,
982        reuse_namespace=reuse_namespace,
983        custom_scenarios=custom_scenarios,
984        skip_preflight=skip_preflight,
985    )
986
987
988def register_local_tools(app: FastMCP) -> None:
989    """Register local tools with the FastMCP app.
990
991    Args:
992        app: FastMCP application instance
993    """
994    register_mcp_tools(app, mcp_module=__name__)