airbyte.mcp.local
Local MCP operations.
local module
MCP primitives registered by the local module of the airbyte-mcp server: 12 tool(s), 0 prompt(s), 0 resource(s).
Tools (12)
describe_default_cache
Hints: read-only · idempotent
Describe the currently configured default cache.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
Parameters
_No parameters._
Show input JSON schema
{
"additionalProperties": false,
"properties": {},
"type": "object"
}
Show output JSON schema
{
"additionalProperties": true,
"type": "object"
}
destination_smoke_test
Hints: destructive
Run smoke tests against a destination connector.
Sends synthetic test data from the smoke test source to the specified destination and reports success or failure. The smoke test source generates data across predefined scenarios covering common destination failure patterns: type variations, null handling, naming edge cases, schema variations, and batch sizes.
When the destination has a compatible cache implementation (DuckDB,
Postgres, Snowflake, BigQuery, MotherDuck), readback introspection is
automatically performed after a successful write. The readback produces
stats on the written data: table row counts, column names/types, and
per-column null/non-null counts. Results are included in the response
as table_statistics and tables_not_found.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
destination_connector_name |
string |
yes | — | The name of the destination connector to test (e.g. 'destination-snowflake', 'destination-motherduck'). |
config |
object | string | null |
no | null |
The destination configuration as a dict object or JSON string. Must not contain hardcoded secrets; use secret_reference::ENV_VAR_NAME instead. |
config_file |
string | string | null |
no | null |
Path to a YAML or JSON file containing the destination configuration. |
config_secret_name |
string | null |
no | null |
The name of the secret containing the destination configuration. |
scenarios |
array<string> | string |
no | "fast" |
Which scenarios to run. Use 'fast' (default) for all fast predefined scenarios (excludes large_batch_stream), 'all' for every predefined scenario including large batch, or provide a list of scenario names or a comma-separated string. |
custom_scenarios |
array<object> | null |
no | null |
Additional custom test scenarios to inject. Each scenario should define 'name', 'json_schema', and optionally 'records' and 'primary_key'. These are unioned with the predefined scenarios. |
docker_image |
string | null |
no | null |
Optional Docker image override for the destination connector (e.g. 'airbyte/destination-snowflake:3.14.0'). |
namespace_suffix |
string | null |
no | null |
Optional suffix appended to the auto-generated namespace. Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). Use this to distinguish concurrent runs. |
reuse_namespace |
string | null |
no | null |
Exact namespace to reuse from a previous run. When set, no new namespace is generated. Useful for running a second test against an already-populated namespace. |
skip_preflight |
boolean |
no | false |
Skip the automatic preflight check that runs basic_types before the requested scenarios. Set to true when you expect basic_types itself to fail or want to save time on repeated runs. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"destination_connector_name": {
"description": "The name of the destination connector to test (e.g. 'destination-snowflake', 'destination-motherduck').",
"type": "string"
},
"config": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The destination configuration as a dict object or JSON string. Must not contain hardcoded secrets; use secret_reference::ENV_VAR_NAME instead."
},
"config_file": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a YAML or JSON file containing the destination configuration."
},
"config_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the secret containing the destination configuration."
},
"scenarios": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "string"
}
],
"default": "fast",
"description": "Which scenarios to run. Use 'fast' (default) for all fast predefined scenarios (excludes large_batch_stream), 'all' for every predefined scenario including large batch, or provide a list of scenario names or a comma-separated string."
},
"custom_scenarios": {
"anyOf": [
{
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "Additional custom test scenarios to inject. Each scenario should define 'name', 'json_schema', and optionally 'records' and 'primary_key'. These are unioned with the predefined scenarios."
},
"docker_image": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional Docker image override for the destination connector (e.g. 'airbyte/destination-snowflake:3.14.0')."
},
"namespace_suffix": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional suffix appended to the auto-generated namespace. Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). Use this to distinguish concurrent runs."
},
"reuse_namespace": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Exact namespace to reuse from a previous run. When set, no new namespace is generated. Useful for running a second test against an already-populated namespace."
},
"skip_preflight": {
"default": false,
"description": "Skip the automatic preflight check that runs basic_types before the requested scenarios. Set to true when you expect basic_types itself to fail or want to save time on repeated runs.",
"type": "boolean"
}
},
"required": [
"destination_connector_name"
],
"type": "object"
}
Show output JSON schema
{
"description": "Result of a destination smoke test run.",
"properties": {
"success": {
"type": "boolean"
},
"destination": {
"type": "string"
},
"namespace": {
"type": "string"
},
"records_delivered": {
"type": "integer"
},
"scenarios_requested": {
"type": "string"
},
"elapsed_seconds": {
"type": "number"
},
"error": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"preflight_passed": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null
},
"table_statistics": {
"anyOf": [
{
"additionalProperties": {
"description": "Statistics for a single table: row count, column info, and per-column stats.",
"properties": {
"table_name": {
"type": "string"
},
"database_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"schema_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"row_count": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null
},
"column_statistics": {
"items": {
"description": "Null/non-null statistics for a single column.",
"properties": {
"column_name": {
"type": "string"
},
"column_type": {
"type": "string"
},
"null_count": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null
},
"non_null_count": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null
},
"total_count": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null
}
},
"required": [
"column_name",
"column_type"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"table_name",
"column_statistics"
],
"type": "object"
},
"type": "object"
},
{
"type": "null"
}
],
"default": null
},
"tables_not_found": {
"anyOf": [
{
"additionalProperties": {
"type": "string"
},
"type": "object"
},
{
"type": "null"
}
],
"default": null
},
"warnings": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null
}
},
"required": [
"success",
"destination",
"namespace",
"records_delivered",
"scenarios_requested",
"elapsed_seconds"
],
"type": "object"
}
get_source_stream_json_schema
Hints: read-only · idempotent
List all properties for a specific stream in a source connector.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_connector_name |
string |
yes | — | The name of the source connector. |
stream_name |
string |
yes | — | The name of the stream. |
config |
object | string | null |
no | null |
The configuration for the source connector as a dict or JSON string. |
config_file |
string | string | null |
no | null |
Path to a YAML or JSON file containing the source connector config. |
config_secret_name |
string | null |
no | null |
The name of the secret containing the configuration. |
override_execution_mode |
enum("docker", "python", "yaml", "auto") |
no | "auto" |
Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used). |
manifest_path |
string | string | null |
no | null |
Path to a local YAML manifest file for declarative connectors. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_connector_name": {
"description": "The name of the source connector.",
"type": "string"
},
"stream_name": {
"description": "The name of the stream.",
"type": "string"
},
"config": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The configuration for the source connector as a dict or JSON string."
},
"config_file": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a YAML or JSON file containing the source connector config."
},
"config_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the secret containing the configuration."
},
"override_execution_mode": {
"default": "auto",
"description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
"enum": [
"docker",
"python",
"yaml",
"auto"
],
"type": "string"
},
"manifest_path": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a local YAML manifest file for declarative connectors."
}
},
"required": [
"source_connector_name",
"stream_name"
],
"type": "object"
}
Show output JSON schema
{
"additionalProperties": true,
"type": "object"
}
get_stream_previews
Hints: read-only
Get sample records (previews) from streams in a source connector.
This operation requires a valid configuration, including any required secrets.
Returns a dictionary mapping stream names to lists of sample records, or an error
message string if an error occurred for that stream.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_name |
string |
yes | — | The name of the source connector. |
config |
object | string | null |
no | null |
The configuration for the source connector as a dict or JSON string. |
config_file |
string | string | null |
no | null |
Path to a YAML or JSON file containing the source connector config. |
config_secret_name |
string | null |
no | null |
The name of the secret containing the configuration. |
streams |
array<string> | string | null |
no | null |
The streams to get previews for. Use '*' for all streams, or None for selected streams. |
limit |
integer |
no | 10 |
The maximum number of sample records to return per stream. |
override_execution_mode |
enum("docker", "python", "yaml", "auto") |
no | "auto" |
Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used). |
manifest_path |
string | string | null |
no | null |
Path to a local YAML manifest file for declarative connectors. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_name": {
"description": "The name of the source connector.",
"type": "string"
},
"config": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The configuration for the source connector as a dict or JSON string."
},
"config_file": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a YAML or JSON file containing the source connector config."
},
"config_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the secret containing the configuration."
},
"streams": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The streams to get previews for. Use '*' for all streams, or None for selected streams."
},
"limit": {
"default": 10,
"description": "The maximum number of sample records to return per stream.",
"type": "integer"
},
"override_execution_mode": {
"default": "auto",
"description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
"enum": [
"docker",
"python",
"yaml",
"auto"
],
"type": "string"
},
"manifest_path": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a local YAML manifest file for declarative connectors."
}
},
"required": [
"source_name"
],
"type": "object"
}
Show output JSON schema
{
"additionalProperties": {
"anyOf": [
{
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
},
{
"type": "string"
}
]
},
"type": "object"
}
list_cached_streams
Hints: read-only · idempotent
List all streams available in the default DuckDB cache.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
Parameters
_No parameters._
Show input JSON schema
{
"additionalProperties": false,
"properties": {},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"description": "Class to hold information about a cached dataset.",
"properties": {
"stream_name": {
"type": "string"
},
"table_name": {
"type": "string"
},
"schema_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
}
},
"required": [
"stream_name",
"table_name"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
list_connector_config_secrets
Hints: read-only · idempotent
List all config_secret_name options that are known for the given connector.
This can be used to find out which already-created config secret names are available for a given connector. The return value is a list of secret names, but it will not return the actual secret values.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connector_name |
string |
yes | — | The name of the connector. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connector_name": {
"description": "The name of the connector.",
"type": "string"
}
},
"required": [
"connector_name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"type": "string"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
list_dotenv_secrets
Hints: read-only · idempotent
List all environment variable names declared within declared .env files.
This returns a dictionary mapping the .env file name to a list of environment
variable names. The values of the environment variables are not returned.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
Parameters
_No parameters._
Show input JSON schema
{
"additionalProperties": false,
"properties": {},
"type": "object"
}
Show output JSON schema
{
"additionalProperties": {
"items": {
"type": "string"
},
"type": "array"
},
"type": "object"
}
list_source_streams
Hints: read-only · idempotent
List all streams available in a source connector.
This operation (generally) requires a valid configuration, including any required secrets.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_connector_name |
string |
yes | — | The name of the source connector. |
config |
object | string | null |
no | null |
The configuration for the source connector as a dict or JSON string. |
config_file |
string | string | null |
no | null |
Path to a YAML or JSON file containing the source connector config. |
config_secret_name |
string | null |
no | null |
The name of the secret containing the configuration. |
override_execution_mode |
enum("docker", "python", "yaml", "auto") |
no | "auto" |
Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used). |
manifest_path |
string | string | null |
no | null |
Path to a local YAML manifest file for declarative connectors. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_connector_name": {
"description": "The name of the source connector.",
"type": "string"
},
"config": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The configuration for the source connector as a dict or JSON string."
},
"config_file": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a YAML or JSON file containing the source connector config."
},
"config_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the secret containing the configuration."
},
"override_execution_mode": {
"default": "auto",
"description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
"enum": [
"docker",
"python",
"yaml",
"auto"
],
"type": "string"
},
"manifest_path": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a local YAML manifest file for declarative connectors."
}
},
"required": [
"source_connector_name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"type": "string"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
read_source_stream_records
Hints: read-only
Get records from a source connector.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_connector_name |
string |
yes | — | The name of the source connector. |
config |
object | string | null |
no | null |
The configuration for the source connector as a dict or JSON string. |
config_file |
string | string | null |
no | null |
Path to a YAML or JSON file containing the source connector config. |
config_secret_name |
string | null |
no | null |
The name of the secret containing the configuration. |
stream_name |
string |
yes | — | The name of the stream to read records from. |
max_records |
integer |
no | 1000 |
The maximum number of records to read. |
override_execution_mode |
enum("docker", "python", "yaml", "auto") |
no | "auto" |
Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used). |
manifest_path |
string | string | null |
no | null |
Path to a local YAML manifest file for declarative connectors. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_connector_name": {
"description": "The name of the source connector.",
"type": "string"
},
"config": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The configuration for the source connector as a dict or JSON string."
},
"config_file": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a YAML or JSON file containing the source connector config."
},
"config_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the secret containing the configuration."
},
"stream_name": {
"description": "The name of the stream to read records from.",
"type": "string"
},
"max_records": {
"default": 1000,
"description": "The maximum number of records to read.",
"type": "integer"
},
"override_execution_mode": {
"default": "auto",
"description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
"enum": [
"docker",
"python",
"yaml",
"auto"
],
"type": "string"
},
"manifest_path": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a local YAML manifest file for declarative connectors."
}
},
"required": [
"source_connector_name",
"stream_name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"anyOf": [
{
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
},
{
"type": "string"
}
]
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
run_sql_query
Hints: read-only · idempotent
Run a SQL query against the default cache.
The dialect of SQL should match the dialect of the default cache.
Use `describe_default_cache` to see the cache type.
For DuckDB-type caches:
- Use `SHOW TABLES` to list all tables.
- Use `DESCRIBE <table_name>` to get the schema of a specific table
For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
sql_query |
string |
yes | — | The SQL query to execute. |
max_records |
integer |
no | 1000 |
Maximum number of records to return. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"sql_query": {
"description": "The SQL query to execute.",
"type": "string"
},
"max_records": {
"default": 1000,
"description": "Maximum number of records to return.",
"type": "integer"
}
},
"required": [
"sql_query"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
sync_source_to_cache
Run a sync from a source connector to the default DuckDB cache.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_connector_name |
string |
yes | — | The name of the source connector. |
config |
object | string | null |
no | null |
The configuration for the source connector as a dict or JSON string. |
config_file |
string | string | null |
no | null |
Path to a YAML or JSON file containing the source connector config. |
config_secret_name |
string | null |
no | null |
The name of the secret containing the configuration. |
streams |
array<string> | string |
no | "suggested" |
The streams to sync. |
override_execution_mode |
enum("docker", "python", "yaml", "auto") |
no | "auto" |
Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used). |
manifest_path |
string | string | null |
no | null |
Path to a local YAML manifest file for declarative connectors. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_connector_name": {
"description": "The name of the source connector.",
"type": "string"
},
"config": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The configuration for the source connector as a dict or JSON string."
},
"config_file": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a YAML or JSON file containing the source connector config."
},
"config_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the secret containing the configuration."
},
"streams": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "string"
}
],
"default": "suggested",
"description": "The streams to sync."
},
"override_execution_mode": {
"default": "auto",
"description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
"enum": [
"docker",
"python",
"yaml",
"auto"
],
"type": "string"
},
"manifest_path": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a local YAML manifest file for declarative connectors."
}
},
"required": [
"source_connector_name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
validate_connector_config
Hints: read-only · idempotent
Validate a connector configuration.
Returns a tuple of (is_valid: bool, message: str).
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connector_name |
string |
yes | — | The name of the connector to validate. |
config |
object | string | null |
no | null |
The configuration for the connector as a dict object or JSON string. |
config_file |
string | string | null |
no | null |
Path to a YAML or JSON file containing the connector configuration. |
config_secret_name |
string | null |
no | null |
The name of the secret containing the configuration. |
override_execution_mode |
enum("docker", "python", "yaml", "auto") |
no | "auto" |
Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used). |
manifest_path |
string | string | null |
no | null |
Path to a local YAML manifest file for declarative connectors. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connector_name": {
"description": "The name of the connector to validate.",
"type": "string"
},
"config": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The configuration for the connector as a dict object or JSON string."
},
"config_file": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a YAML or JSON file containing the connector configuration."
},
"config_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the secret containing the configuration."
},
"override_execution_mode": {
"default": "auto",
"description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
"enum": [
"docker",
"python",
"yaml",
"auto"
],
"type": "string"
},
"manifest_path": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a local YAML manifest file for declarative connectors."
}
},
"required": [
"connector_name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"maxItems": 2,
"minItems": 2,
"prefixItems": [
{
"type": "boolean"
},
{
"type": "string"
}
],
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Local MCP operations. 3 4.. include:: ../../docs/mcp-generated/local.md 5""" 6 7# No public Python API — MCP primitives are registered via decorators and 8# documented via the generated Markdown include above. Setting `__all__` to an 9# empty list tells pdoc (and other doc tools) not to surface the individual 10# tool / helper definitions as a redundant "API Documentation" list. 11__all__: list[str] = [] 12 13import sys 14import traceback 15from itertools import islice 16from pathlib import Path 17from typing import TYPE_CHECKING, Annotated, Any, Literal 18 19from fastmcp import FastMCP 20from fastmcp_extensions import mcp_tool, register_mcp_tools 21from pydantic import BaseModel, Field 22 23from airbyte import get_source 24from airbyte._util.destination_smoke_tests import ( 25 DestinationSmokeTestResult, 26 run_destination_smoke_test, 27) 28from airbyte._util.meta import is_docker_installed 29from airbyte.caches.util import get_default_cache 30from airbyte.destinations.util import get_destination 31from airbyte.mcp._arg_resolvers import resolve_connector_config, resolve_list_of_strings 32from airbyte.registry import get_connector_metadata 33from airbyte.secrets.config import _get_secret_sources 34from airbyte.secrets.env_vars import DotenvSecretManager 35from airbyte.secrets.google_gsm import GoogleGSMSecretManager 36from airbyte.sources.base import Source 37 38 39if TYPE_CHECKING: 40 from airbyte.caches.duckdb import DuckDBCache 41 42 43_CONFIG_HELP = """ 44You can provide `config` as JSON or a Path to a YAML/JSON file. 45If a `dict` is provided, it must not contain hardcoded secrets. 46Instead, secrets should be provided using environment variables, 47and the config should reference them using the format 48`secret_reference::ENV_VAR_NAME`. 49 50You can also provide a `config_secret_name` to use a specific 51secret name for the configuration. This is useful if you want to 52validate a configuration that is stored in a secrets manager. 53 54If `config_secret_name` is provided, it should point to a string 55that contains valid JSON or YAML. 56 57If both `config` and `config_secret_name` are provided, the 58`config` will be loaded first and then the referenced secret config 59will be layered on top of the non-secret config. 60 61For declarative connectors, you can provide a `manifest_path` to 62specify a local YAML manifest file instead of using the registry 63version. This is useful for testing custom or locally-developed 64connector manifests. 65""" 66 67 68def _get_mcp_source( 69 connector_name: str, 70 override_execution_mode: Literal["auto", "docker", "python", "yaml"] = "auto", 71 *, 72 install_if_missing: bool = True, 73 manifest_path: str | Path | None, 74) -> Source: 75 """Get the MCP source for a connector.""" 76 if manifest_path: 77 override_execution_mode = "yaml" 78 elif override_execution_mode == "auto" and is_docker_installed(): 79 override_execution_mode = "docker" 80 81 source: Source 82 if override_execution_mode == "auto": 83 # Use defaults with no overrides 84 source = get_source( 85 connector_name, 86 install_if_missing=False, 87 source_manifest=manifest_path or None, 88 ) 89 elif override_execution_mode == "python": 90 source = get_source( 91 connector_name, 92 use_python=True, 93 install_if_missing=False, 94 source_manifest=manifest_path or None, 95 ) 96 elif override_execution_mode == "docker": 97 source = get_source( 98 connector_name, 99 docker_image=True, 100 install_if_missing=False, 101 source_manifest=manifest_path or None, 102 ) 103 elif override_execution_mode == "yaml": 104 source = get_source( 105 connector_name, 106 source_manifest=manifest_path or True, 107 install_if_missing=False, 108 ) 109 else: 110 raise ValueError( 111 f"Unknown execution method: {override_execution_mode}. " 112 "Expected one of: ['auto', 'docker', 'python', 'yaml']." 113 ) 114 115 # Ensure installed: 116 if install_if_missing: 117 source.executor.ensure_installation() 118 119 return source 120 121 122@mcp_tool( 123 read_only=True, 124 idempotent=True, 125 extra_help_text=_CONFIG_HELP, 126) 127def validate_connector_config( 128 connector_name: Annotated[ 129 str, 130 Field(description="The name of the connector to validate."), 131 ], 132 config: Annotated[ 133 dict | str | None, 134 Field( 135 description="The configuration for the connector as a dict object or JSON string.", 136 default=None, 137 ), 138 ], 139 config_file: Annotated[ 140 str | Path | None, 141 Field( 142 description="Path to a YAML or JSON file containing the connector configuration.", 143 default=None, 144 ), 145 ], 146 config_secret_name: Annotated[ 147 str | None, 148 Field( 149 description="The name of the secret containing the configuration.", 150 default=None, 151 ), 152 ], 153 override_execution_mode: Annotated[ 154 Literal["docker", "python", "yaml", "auto"], 155 Field( 156 description="Optionally override the execution method to use for the connector. " 157 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 158 default="auto", 159 ), 160 ], 161 manifest_path: Annotated[ 162 str | Path | None, 163 Field( 164 description="Path to a local YAML manifest file for declarative connectors.", 165 default=None, 166 ), 167 ], 168) -> tuple[bool, str]: 169 """Validate a connector configuration. 170 171 Returns a tuple of (is_valid: bool, message: str). 172 """ 173 try: 174 source: Source = _get_mcp_source( 175 connector_name, 176 override_execution_mode=override_execution_mode, 177 manifest_path=manifest_path, 178 ) 179 except Exception as ex: 180 return False, f"Failed to get connector '{connector_name}': {ex}" 181 182 try: 183 config_dict = resolve_connector_config( 184 config=config, 185 config_file=config_file, 186 config_secret_name=config_secret_name, 187 config_spec_jsonschema=source.config_spec, 188 ) 189 source.set_config(config_dict) 190 except Exception as ex: 191 return False, f"Failed to resolve configuration for {connector_name}: {ex}" 192 193 try: 194 source.check() 195 except Exception as ex: 196 return False, f"Configuration for {connector_name} is invalid: {ex}" 197 198 return True, f"Configuration for {connector_name} is valid!" 199 200 201@mcp_tool( 202 read_only=True, 203 idempotent=True, 204) 205def list_connector_config_secrets( 206 connector_name: Annotated[ 207 str, 208 Field(description="The name of the connector."), 209 ], 210) -> list[str]: 211 """List all `config_secret_name` options that are known for the given connector. 212 213 This can be used to find out which already-created config secret names are available 214 for a given connector. The return value is a list of secret names, but it will not 215 return the actual secret values. 216 """ 217 secrets_names: list[str] = [] 218 for secrets_mgr in _get_secret_sources(): 219 if isinstance(secrets_mgr, GoogleGSMSecretManager): 220 secrets_names.extend( 221 [ 222 secret_handle.secret_name.split("/")[-1] 223 for secret_handle in secrets_mgr.fetch_connector_secrets(connector_name) 224 ] 225 ) 226 227 return secrets_names 228 229 230@mcp_tool( 231 read_only=True, 232 idempotent=True, 233 extra_help_text=_CONFIG_HELP, 234) 235def list_dotenv_secrets() -> dict[str, list[str]]: 236 """List all environment variable names declared within declared .env files. 237 238 This returns a dictionary mapping the .env file name to a list of environment 239 variable names. The values of the environment variables are not returned. 240 """ 241 result: dict[str, list[str]] = {} 242 for secrets_mgr in _get_secret_sources(): 243 if isinstance(secrets_mgr, DotenvSecretManager) and secrets_mgr.dotenv_path: 244 result[str(secrets_mgr.dotenv_path.resolve())] = secrets_mgr.list_secrets_names() 245 246 return result 247 248 249@mcp_tool( 250 read_only=True, 251 idempotent=True, 252 extra_help_text=_CONFIG_HELP, 253) 254def list_source_streams( 255 source_connector_name: Annotated[ 256 str, 257 Field(description="The name of the source connector."), 258 ], 259 config: Annotated[ 260 dict | str | None, 261 Field( 262 description="The configuration for the source connector as a dict or JSON string.", 263 default=None, 264 ), 265 ], 266 config_file: Annotated[ 267 str | Path | None, 268 Field( 269 description="Path to a YAML or JSON file containing the source connector config.", 270 default=None, 271 ), 272 ], 273 config_secret_name: Annotated[ 274 str | None, 275 Field( 276 description="The name of the secret containing the configuration.", 277 default=None, 278 ), 279 ], 280 override_execution_mode: Annotated[ 281 Literal["docker", "python", "yaml", "auto"], 282 Field( 283 description="Optionally override the execution method to use for the connector. " 284 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 285 default="auto", 286 ), 287 ], 288 manifest_path: Annotated[ 289 str | Path | None, 290 Field( 291 description="Path to a local YAML manifest file for declarative connectors.", 292 default=None, 293 ), 294 ], 295) -> list[str]: 296 """List all streams available in a source connector. 297 298 This operation (generally) requires a valid configuration, including any required secrets. 299 """ 300 source: Source = _get_mcp_source( 301 connector_name=source_connector_name, 302 override_execution_mode=override_execution_mode, 303 manifest_path=manifest_path, 304 ) 305 config_dict = resolve_connector_config( 306 config=config, 307 config_file=config_file, 308 config_secret_name=config_secret_name, 309 config_spec_jsonschema=source.config_spec, 310 ) 311 source.set_config(config_dict) 312 return source.get_available_streams() 313 314 315@mcp_tool( 316 read_only=True, 317 idempotent=True, 318 extra_help_text=_CONFIG_HELP, 319) 320def get_source_stream_json_schema( 321 source_connector_name: Annotated[ 322 str, 323 Field(description="The name of the source connector."), 324 ], 325 stream_name: Annotated[ 326 str, 327 Field(description="The name of the stream."), 328 ], 329 config: Annotated[ 330 dict | str | None, 331 Field( 332 description="The configuration for the source connector as a dict or JSON string.", 333 default=None, 334 ), 335 ], 336 config_file: Annotated[ 337 str | Path | None, 338 Field( 339 description="Path to a YAML or JSON file containing the source connector config.", 340 default=None, 341 ), 342 ], 343 config_secret_name: Annotated[ 344 str | None, 345 Field( 346 description="The name of the secret containing the configuration.", 347 default=None, 348 ), 349 ], 350 override_execution_mode: Annotated[ 351 Literal["docker", "python", "yaml", "auto"], 352 Field( 353 description="Optionally override the execution method to use for the connector. " 354 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 355 default="auto", 356 ), 357 ], 358 manifest_path: Annotated[ 359 str | Path | None, 360 Field( 361 description="Path to a local YAML manifest file for declarative connectors.", 362 default=None, 363 ), 364 ], 365) -> dict[str, Any]: 366 """List all properties for a specific stream in a source connector.""" 367 source: Source = _get_mcp_source( 368 connector_name=source_connector_name, 369 override_execution_mode=override_execution_mode, 370 manifest_path=manifest_path, 371 ) 372 config_dict = resolve_connector_config( 373 config=config, 374 config_file=config_file, 375 config_secret_name=config_secret_name, 376 config_spec_jsonschema=source.config_spec, 377 ) 378 source.set_config(config_dict) 379 return source.get_stream_json_schema(stream_name=stream_name) 380 381 382@mcp_tool( 383 read_only=True, 384 extra_help_text=_CONFIG_HELP, 385) 386def read_source_stream_records( 387 source_connector_name: Annotated[ 388 str, 389 Field(description="The name of the source connector."), 390 ], 391 config: Annotated[ 392 dict | str | None, 393 Field( 394 description="The configuration for the source connector as a dict or JSON string.", 395 default=None, 396 ), 397 ], 398 config_file: Annotated[ 399 str | Path | None, 400 Field( 401 description="Path to a YAML or JSON file containing the source connector config.", 402 default=None, 403 ), 404 ], 405 config_secret_name: Annotated[ 406 str | None, 407 Field( 408 description="The name of the secret containing the configuration.", 409 default=None, 410 ), 411 ], 412 *, 413 stream_name: Annotated[ 414 str, 415 Field(description="The name of the stream to read records from."), 416 ], 417 max_records: Annotated[ 418 int, 419 Field( 420 description="The maximum number of records to read.", 421 default=1000, 422 ), 423 ], 424 override_execution_mode: Annotated[ 425 Literal["docker", "python", "yaml", "auto"], 426 Field( 427 description="Optionally override the execution method to use for the connector. " 428 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 429 default="auto", 430 ), 431 ], 432 manifest_path: Annotated[ 433 str | Path | None, 434 Field( 435 description="Path to a local YAML manifest file for declarative connectors.", 436 default=None, 437 ), 438 ], 439) -> list[dict[str, Any]] | str: 440 """Get records from a source connector.""" 441 try: 442 source: Source = _get_mcp_source( 443 connector_name=source_connector_name, 444 override_execution_mode=override_execution_mode, 445 manifest_path=manifest_path, 446 ) 447 config_dict = resolve_connector_config( 448 config=config, 449 config_file=config_file, 450 config_secret_name=config_secret_name, 451 config_spec_jsonschema=source.config_spec, 452 ) 453 source.set_config(config_dict) 454 # First we get a generator for the records in the specified stream. 455 record_generator = source.get_records(stream_name) 456 # Next we load a limited number of records from the generator into our list. 457 records: list[dict[str, Any]] = list(islice(record_generator, max_records)) 458 459 print(f"Retrieved {len(records)} records from stream '{stream_name}'", sys.stderr) 460 461 except Exception as ex: 462 tb_str = traceback.format_exc() 463 # If any error occurs, we print the error message to stderr and return an empty list. 464 return ( 465 f"Error reading records from source '{source_connector_name}': {ex!r}, {ex!s}\n{tb_str}" 466 ) 467 468 else: 469 return records 470 471 472@mcp_tool( 473 read_only=True, 474 extra_help_text=_CONFIG_HELP, 475) 476def get_stream_previews( 477 source_name: Annotated[ 478 str, 479 Field(description="The name of the source connector."), 480 ], 481 config: Annotated[ 482 dict | str | None, 483 Field( 484 description="The configuration for the source connector as a dict or JSON string.", 485 default=None, 486 ), 487 ], 488 config_file: Annotated[ 489 str | Path | None, 490 Field( 491 description="Path to a YAML or JSON file containing the source connector config.", 492 default=None, 493 ), 494 ], 495 config_secret_name: Annotated[ 496 str | None, 497 Field( 498 description="The name of the secret containing the configuration.", 499 default=None, 500 ), 501 ], 502 streams: Annotated[ 503 list[str] | str | None, 504 Field( 505 description=( 506 "The streams to get previews for. " 507 "Use '*' for all streams, or None for selected streams." 508 ), 509 default=None, 510 ), 511 ], 512 limit: Annotated[ 513 int, 514 Field( 515 description="The maximum number of sample records to return per stream.", 516 default=10, 517 ), 518 ], 519 override_execution_mode: Annotated[ 520 Literal["docker", "python", "yaml", "auto"], 521 Field( 522 description="Optionally override the execution method to use for the connector. " 523 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 524 default="auto", 525 ), 526 ], 527 manifest_path: Annotated[ 528 str | Path | None, 529 Field( 530 description="Path to a local YAML manifest file for declarative connectors.", 531 default=None, 532 ), 533 ], 534) -> dict[str, list[dict[str, Any]] | str]: 535 """Get sample records (previews) from streams in a source connector. 536 537 This operation requires a valid configuration, including any required secrets. 538 Returns a dictionary mapping stream names to lists of sample records, or an error 539 message string if an error occurred for that stream. 540 """ 541 source: Source = _get_mcp_source( 542 connector_name=source_name, 543 override_execution_mode=override_execution_mode, 544 manifest_path=manifest_path, 545 ) 546 547 config_dict = resolve_connector_config( 548 config=config, 549 config_file=config_file, 550 config_secret_name=config_secret_name, 551 config_spec_jsonschema=source.config_spec, 552 ) 553 source.set_config(config_dict) 554 555 streams_param: list[str] | Literal["*"] | None = resolve_list_of_strings( 556 streams 557 ) # pyrefly: ignore[no-matching-overload] 558 if streams_param and len(streams_param) == 1 and streams_param[0] == "*": 559 streams_param = "*" 560 561 try: 562 samples_result = source.get_samples( 563 streams=streams_param, 564 limit=limit, 565 on_error="ignore", 566 ) 567 except Exception as ex: 568 tb_str = traceback.format_exc() 569 return { 570 "ERROR": f"Error getting stream previews from source '{source_name}': " 571 f"{ex!r}, {ex!s}\n{tb_str}" 572 } 573 574 result: dict[str, list[dict[str, Any]] | str] = {} 575 for stream_name, dataset in samples_result.items(): 576 if dataset is None: 577 result[stream_name] = f"Could not retrieve stream samples for stream '{stream_name}'" 578 else: 579 result[stream_name] = list(dataset) 580 581 return result 582 583 584@mcp_tool( 585 destructive=False, 586 extra_help_text=_CONFIG_HELP, 587) 588def sync_source_to_cache( 589 source_connector_name: Annotated[ 590 str, 591 Field(description="The name of the source connector."), 592 ], 593 config: Annotated[ 594 dict | str | None, 595 Field( 596 description="The configuration for the source connector as a dict or JSON string.", 597 default=None, 598 ), 599 ], 600 config_file: Annotated[ 601 str | Path | None, 602 Field( 603 description="Path to a YAML or JSON file containing the source connector config.", 604 default=None, 605 ), 606 ], 607 config_secret_name: Annotated[ 608 str | None, 609 Field( 610 description="The name of the secret containing the configuration.", 611 default=None, 612 ), 613 ], 614 streams: Annotated[ 615 list[str] | str, 616 Field( 617 description="The streams to sync.", 618 default="suggested", 619 ), 620 ], 621 override_execution_mode: Annotated[ 622 Literal["docker", "python", "yaml", "auto"], 623 Field( 624 description="Optionally override the execution method to use for the connector. " 625 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 626 default="auto", 627 ), 628 ], 629 manifest_path: Annotated[ 630 str | Path | None, 631 Field( 632 description="Path to a local YAML manifest file for declarative connectors.", 633 default=None, 634 ), 635 ], 636) -> str: 637 """Run a sync from a source connector to the default DuckDB cache.""" 638 source: Source = _get_mcp_source( 639 connector_name=source_connector_name, 640 override_execution_mode=override_execution_mode, 641 manifest_path=manifest_path, 642 ) 643 config_dict = resolve_connector_config( 644 config=config, 645 config_file=config_file, 646 config_secret_name=config_secret_name, 647 config_spec_jsonschema=source.config_spec, 648 ) 649 source.set_config(config_dict) 650 cache = get_default_cache() 651 652 streams = resolve_list_of_strings(streams) 653 if streams and len(streams) == 1 and streams[0] in {"*", "suggested"}: 654 # Float '*' and 'suggested' to the top-level for special processing: 655 streams = streams[0] 656 657 if isinstance(streams, str) and streams == "suggested": 658 streams = "*" # Default to all streams if 'suggested' is not otherwise specified. 659 try: 660 metadata = get_connector_metadata( 661 source_connector_name, 662 ) 663 except Exception: 664 streams = "*" # Fallback to all streams if suggested streams fail. 665 else: 666 if metadata is not None: 667 streams = metadata.suggested_streams or "*" 668 669 if isinstance(streams, str) and streams != "*": 670 streams = [streams] # Ensure streams is a list 671 672 source.read( 673 cache=cache, 674 streams=streams, 675 ) 676 del cache # Ensure the cache is closed properly 677 678 summary: str = f"Sync completed for '{source_connector_name}'!\n\n" 679 summary += "Data written to default DuckDB cache\n" 680 return summary 681 682 683class CachedDatasetInfo(BaseModel): 684 """Class to hold information about a cached dataset.""" 685 686 stream_name: str 687 """The name of the stream in the cache.""" 688 table_name: str 689 schema_name: str | None = None 690 691 692@mcp_tool( 693 read_only=True, 694 idempotent=True, 695 extra_help_text=_CONFIG_HELP, 696) 697def list_cached_streams() -> list[CachedDatasetInfo]: 698 """List all streams available in the default DuckDB cache.""" 699 cache: DuckDBCache = get_default_cache() 700 result = [ 701 CachedDatasetInfo( 702 stream_name=stream_name, 703 table_name=(cache.table_prefix or "") + stream_name, 704 schema_name=cache.schema_name, 705 ) 706 for stream_name in cache.streams 707 ] 708 del cache # Ensure the cache is closed properly 709 return result 710 711 712@mcp_tool( 713 read_only=True, 714 idempotent=True, 715 extra_help_text=_CONFIG_HELP, 716) 717def describe_default_cache() -> dict[str, Any]: 718 """Describe the currently configured default cache.""" 719 cache = get_default_cache() 720 return { 721 "cache_type": type(cache).__name__, 722 "cache_dir": str(cache.cache_dir), 723 "cache_db_path": str(Path(cache.db_path).absolute()), 724 "cached_streams": list(cache.streams.keys()), 725 } 726 727 728def _is_safe_sql(sql_query: str) -> bool: 729 """Check if a SQL query is safe to execute. 730 731 For security reasons, we only allow read-only operations like SELECT, DESCRIBE, and SHOW. 732 Multi-statement queries (containing semicolons) are also disallowed for security. 733 734 Note: SQLAlchemy will also validate downstream, but this is a first-pass check. 735 736 Args: 737 sql_query: The SQL query to check 738 739 Returns: 740 True if the query is safe to execute, False otherwise 741 """ 742 # Remove leading/trailing whitespace and convert to uppercase for checking 743 normalized_query = sql_query.strip().upper() 744 745 # Disallow multi-statement queries (containing semicolons) 746 # Note: We check the original query to catch semicolons anywhere, including in comments 747 if ";" in sql_query: 748 return False 749 750 # List of allowed SQL statement prefixes (read-only operations) 751 allowed_prefixes = ( 752 "SELECT", 753 "DESCRIBE", 754 "DESC", # Short form of DESCRIBE 755 "SHOW", 756 "EXPLAIN", # Also safe - shows query execution plan 757 ) 758 759 # Check if the query starts with any allowed prefix 760 return any(normalized_query.startswith(prefix) for prefix in allowed_prefixes) 761 762 763@mcp_tool( 764 read_only=True, 765 idempotent=True, 766 extra_help_text=_CONFIG_HELP, 767) 768def run_sql_query( 769 sql_query: Annotated[ 770 str, 771 Field(description="The SQL query to execute."), 772 ], 773 max_records: Annotated[ 774 int, 775 Field( 776 description="Maximum number of records to return.", 777 default=1000, 778 ), 779 ], 780) -> list[dict[str, Any]]: 781 """Run a SQL query against the default cache. 782 783 The dialect of SQL should match the dialect of the default cache. 784 Use `describe_default_cache` to see the cache type. 785 786 For DuckDB-type caches: 787 - Use `SHOW TABLES` to list all tables. 788 - Use `DESCRIBE <table_name>` to get the schema of a specific table 789 790 For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN. 791 """ 792 # Check if the query is safe to execute 793 if not _is_safe_sql(sql_query): 794 return [ 795 { 796 "ERROR": "Unsafe SQL query detected. Only read-only operations are allowed: " 797 "SELECT, DESCRIBE, SHOW, EXPLAIN", 798 "SQL_QUERY": sql_query, 799 } 800 ] 801 802 cache: DuckDBCache = get_default_cache() 803 try: 804 return cache.run_sql_query( 805 sql_query, 806 max_records=max_records, 807 ) 808 except Exception as ex: 809 tb_str = traceback.format_exc() 810 return [ 811 { 812 "ERROR": f"Error running SQL query: {ex!r}, {ex!s}", 813 "TRACEBACK": tb_str, 814 "SQL_QUERY": sql_query, 815 } 816 ] 817 finally: 818 del cache # Ensure the cache is closed properly 819 820 821@mcp_tool( 822 destructive=True, 823) 824def destination_smoke_test( # noqa: PLR0913, PLR0917 825 destination_connector_name: Annotated[ 826 str, 827 Field( 828 description=( 829 "The name of the destination connector to test " 830 "(e.g. 'destination-snowflake', 'destination-motherduck')." 831 ), 832 ), 833 ], 834 config: Annotated[ 835 dict | str | None, 836 Field( 837 description=( 838 "The destination configuration as a dict object or JSON string. " 839 "Must not contain hardcoded secrets; use secret_reference::ENV_VAR_NAME instead." 840 ), 841 default=None, 842 ), 843 ], 844 config_file: Annotated[ 845 str | Path | None, 846 Field( 847 description="Path to a YAML or JSON file containing the destination configuration.", 848 default=None, 849 ), 850 ], 851 config_secret_name: Annotated[ 852 str | None, 853 Field( 854 description="The name of the secret containing the destination configuration.", 855 default=None, 856 ), 857 ], 858 scenarios: Annotated[ 859 list[str] | str, 860 Field( 861 description=( 862 "Which scenarios to run. Use 'fast' (default) for all fast predefined " 863 "scenarios (excludes large_batch_stream), 'all' for every predefined " 864 "scenario including large batch, or provide a list of scenario names " 865 "or a comma-separated string." 866 ), 867 default="fast", 868 ), 869 ], 870 custom_scenarios: Annotated[ 871 list[dict[str, Any]] | None, 872 Field( 873 description=( 874 "Additional custom test scenarios to inject. Each scenario should define " 875 "'name', 'json_schema', and optionally 'records' and 'primary_key'. " 876 "These are unioned with the predefined scenarios." 877 ), 878 default=None, 879 ), 880 ], 881 docker_image: Annotated[ 882 str | None, 883 Field( 884 description=( 885 "Optional Docker image override for the destination connector " 886 "(e.g. 'airbyte/destination-snowflake:3.14.0')." 887 ), 888 default=None, 889 ), 890 ], 891 namespace_suffix: Annotated[ 892 str | None, 893 Field( 894 description=( 895 "Optional suffix appended to the auto-generated namespace. " 896 "Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). " 897 "Use this to distinguish concurrent runs." 898 ), 899 default=None, 900 ), 901 ], 902 reuse_namespace: Annotated[ 903 str | None, 904 Field( 905 description=( 906 "Exact namespace to reuse from a previous run. " 907 "When set, no new namespace is generated. " 908 "Useful for running a second test against an already-populated namespace." 909 ), 910 default=None, 911 ), 912 ], 913 skip_preflight: Annotated[ 914 bool, 915 Field( 916 description=( 917 "Skip the automatic preflight check that runs basic_types before " 918 "the requested scenarios. Set to true when you expect basic_types " 919 "itself to fail or want to save time on repeated runs." 920 ), 921 default=False, 922 ), 923 ], 924) -> DestinationSmokeTestResult: 925 """Run smoke tests against a destination connector. 926 927 Sends synthetic test data from the smoke test source to the specified 928 destination and reports success or failure. The smoke test source generates 929 data across predefined scenarios covering common destination failure patterns: 930 type variations, null handling, naming edge cases, schema variations, and 931 batch sizes. 932 933 When the destination has a compatible cache implementation (DuckDB, 934 Postgres, Snowflake, BigQuery, MotherDuck), readback introspection is 935 automatically performed after a successful write. The readback produces 936 stats on the written data: table row counts, column names/types, and 937 per-column null/non-null counts. Results are included in the response 938 as `table_statistics` and `tables_not_found`. 939 """ 940 # Resolve destination config 941 config_dict = resolve_connector_config( 942 config=config, 943 config_file=config_file, 944 config_secret_name=config_secret_name, 945 ) 946 947 # Set up destination 948 destination_kwargs: dict[str, Any] = { 949 "name": destination_connector_name, 950 "config": config_dict, 951 } 952 if docker_image: 953 destination_kwargs["docker_image"] = docker_image 954 elif is_docker_installed(): 955 destination_kwargs["docker_image"] = True 956 957 destination_obj = get_destination(**destination_kwargs) 958 959 # Resolve scenarios for the shared helper 960 resolved_scenarios: str | list[str] 961 if isinstance(scenarios, str): 962 resolved_scenarios = scenarios 963 else: 964 resolved_scenarios = resolve_list_of_strings(scenarios) or "fast" 965 966 return run_destination_smoke_test( 967 destination=destination_obj, 968 scenarios=resolved_scenarios, 969 namespace_suffix=namespace_suffix, 970 reuse_namespace=reuse_namespace, 971 custom_scenarios=custom_scenarios, 972 skip_preflight=skip_preflight, 973 ) 974 975 976def register_local_tools(app: FastMCP) -> None: 977 """Register local tools with the FastMCP app. 978 979 Args: 980 app: FastMCP application instance 981 """ 982 register_mcp_tools(app, mcp_module=__name__)