airbyte.mcp.local
Local MCP operations.
local module
MCP primitives registered by the local module of the airbyte-mcp server: 12 tool(s), 0 prompt(s), 0 resource(s).
Tools (12)
describe_default_cache
Hints: read-only · idempotent
Describe the currently configured default cache.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
Parameters
_No parameters._
Show input JSON schema
{
"additionalProperties": false,
"properties": {},
"type": "object"
}
Show output JSON schema
{
"additionalProperties": true,
"type": "object"
}
destination_smoke_test
Hints: destructive
Run smoke tests against a destination connector.
Sends synthetic test data from the smoke test source to the specified destination and reports success or failure. The smoke test source generates data across predefined scenarios covering common destination failure patterns: type variations, null handling, naming edge cases, schema variations, and batch sizes.
When the destination has a compatible cache implementation (DuckDB,
Postgres, Snowflake, BigQuery, MotherDuck), readback introspection is
automatically performed after a successful write. The readback produces
stats on the written data: table row counts, column names/types, and
per-column null/non-null counts. Results are included in the response
as table_statistics and tables_not_found.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
destination_connector_name |
string |
yes | — | The name of the destination connector to test (e.g. 'destination-snowflake', 'destination-motherduck'). |
config |
object | string | null |
no | null |
The destination configuration as a dict object or JSON string. Must not contain hardcoded secrets; use secret_reference::ENV_VAR_NAME instead. |
config_file |
string | string | null |
no | null |
Path to a YAML or JSON file containing the destination configuration. |
config_secret_name |
string | null |
no | null |
The name of the secret containing the destination configuration. |
scenarios |
array<string> | string |
no | "fast" |
Which scenarios to run. Use 'fast' (default) for all fast predefined scenarios (excludes large_batch_stream), 'all' for every predefined scenario including large batch, or provide a list of scenario names or a comma-separated string. |
custom_scenarios |
array<object> | null |
no | null |
Additional custom test scenarios to inject. Each scenario should define 'name', 'json_schema', and optionally 'records' and 'primary_key'. These are unioned with the predefined scenarios. |
docker_image |
string | null |
no | null |
Optional Docker image override for the destination connector (e.g. 'airbyte/destination-snowflake:3.14.0'). |
namespace_suffix |
string | null |
no | null |
Optional suffix appended to the auto-generated namespace. Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). Use this to distinguish concurrent runs. |
reuse_namespace |
string | null |
no | null |
Exact namespace to reuse from a previous run. When set, no new namespace is generated. Useful for running a second test against an already-populated namespace. |
skip_preflight |
boolean |
no | false |
Skip the automatic preflight check that runs basic_types before the requested scenarios. Set to true when you expect basic_types itself to fail or want to save time on repeated runs. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"destination_connector_name": {
"description": "The name of the destination connector to test (e.g. 'destination-snowflake', 'destination-motherduck').",
"type": "string"
},
"config": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The destination configuration as a dict object or JSON string. Must not contain hardcoded secrets; use secret_reference::ENV_VAR_NAME instead."
},
"config_file": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a YAML or JSON file containing the destination configuration."
},
"config_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the secret containing the destination configuration."
},
"scenarios": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "string"
}
],
"default": "fast",
"description": "Which scenarios to run. Use 'fast' (default) for all fast predefined scenarios (excludes large_batch_stream), 'all' for every predefined scenario including large batch, or provide a list of scenario names or a comma-separated string."
},
"custom_scenarios": {
"anyOf": [
{
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "Additional custom test scenarios to inject. Each scenario should define 'name', 'json_schema', and optionally 'records' and 'primary_key'. These are unioned with the predefined scenarios."
},
"docker_image": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional Docker image override for the destination connector (e.g. 'airbyte/destination-snowflake:3.14.0')."
},
"namespace_suffix": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional suffix appended to the auto-generated namespace. Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). Use this to distinguish concurrent runs."
},
"reuse_namespace": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Exact namespace to reuse from a previous run. When set, no new namespace is generated. Useful for running a second test against an already-populated namespace."
},
"skip_preflight": {
"default": false,
"description": "Skip the automatic preflight check that runs basic_types before the requested scenarios. Set to true when you expect basic_types itself to fail or want to save time on repeated runs.",
"type": "boolean"
}
},
"required": [
"destination_connector_name"
],
"type": "object"
}
Show output JSON schema
{
"description": "Result of a destination smoke test run.",
"properties": {
"success": {
"type": "boolean"
},
"destination": {
"type": "string"
},
"namespace": {
"type": "string"
},
"records_delivered": {
"type": "integer"
},
"scenarios_requested": {
"type": "string"
},
"elapsed_seconds": {
"type": "number"
},
"error": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"preflight_passed": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null
},
"table_statistics": {
"anyOf": [
{
"additionalProperties": {
"description": "Statistics for a single table: row count, column info, and per-column stats.",
"properties": {
"table_name": {
"type": "string"
},
"database_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"schema_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"row_count": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null
},
"column_statistics": {
"items": {
"description": "Null/non-null statistics for a single column.",
"properties": {
"column_name": {
"type": "string"
},
"column_type": {
"type": "string"
},
"null_count": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null
},
"non_null_count": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null
},
"total_count": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null
}
},
"required": [
"column_name",
"column_type"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"table_name",
"column_statistics"
],
"type": "object"
},
"type": "object"
},
{
"type": "null"
}
],
"default": null
},
"tables_not_found": {
"anyOf": [
{
"additionalProperties": {
"type": "string"
},
"type": "object"
},
{
"type": "null"
}
],
"default": null
},
"warnings": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null
}
},
"required": [
"success",
"destination",
"namespace",
"records_delivered",
"scenarios_requested",
"elapsed_seconds"
],
"type": "object"
}
get_source_stream_json_schema
Hints: read-only · idempotent
List all properties for a specific stream in a source connector.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_connector_name |
string |
yes | — | The name of the source connector. |
stream_name |
string |
yes | — | The name of the stream. |
config |
object | string | null |
no | null |
The configuration for the source connector as a dict or JSON string. |
config_file |
string | string | null |
no | null |
Path to a YAML or JSON file containing the source connector config. |
config_secret_name |
string | null |
no | null |
The name of the secret containing the configuration. |
override_execution_mode |
enum("docker", "python", "yaml", "auto") |
no | "auto" |
Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used). |
manifest_path |
string | string | null |
no | null |
Path to a local YAML manifest file for declarative connectors. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_connector_name": {
"description": "The name of the source connector.",
"type": "string"
},
"stream_name": {
"description": "The name of the stream.",
"type": "string"
},
"config": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The configuration for the source connector as a dict or JSON string."
},
"config_file": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a YAML or JSON file containing the source connector config."
},
"config_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the secret containing the configuration."
},
"override_execution_mode": {
"default": "auto",
"description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
"enum": [
"docker",
"python",
"yaml",
"auto"
],
"type": "string"
},
"manifest_path": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a local YAML manifest file for declarative connectors."
}
},
"required": [
"source_connector_name",
"stream_name"
],
"type": "object"
}
Show output JSON schema
{
"additionalProperties": true,
"type": "object"
}
get_stream_previews
Hints: read-only
Get sample records (previews) from streams in a source connector.
This operation requires a valid configuration, including any required secrets.
Returns a dictionary mapping stream names to lists of sample records, or an error
message string if an error occurred for that stream.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_name |
string |
yes | — | The name of the source connector. |
config |
object | string | null |
no | null |
The configuration for the source connector as a dict or JSON string. |
config_file |
string | string | null |
no | null |
Path to a YAML or JSON file containing the source connector config. |
config_secret_name |
string | null |
no | null |
The name of the secret containing the configuration. |
streams |
array<string> | string | null |
no | null |
The streams to get previews for. Use '*' for all streams, or None for selected streams. |
limit |
integer |
no | 10 |
The maximum number of sample records to return per stream. |
override_execution_mode |
enum("docker", "python", "yaml", "auto") |
no | "auto" |
Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used). |
manifest_path |
string | string | null |
no | null |
Path to a local YAML manifest file for declarative connectors. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_name": {
"description": "The name of the source connector.",
"type": "string"
},
"config": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The configuration for the source connector as a dict or JSON string."
},
"config_file": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a YAML or JSON file containing the source connector config."
},
"config_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the secret containing the configuration."
},
"streams": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The streams to get previews for. Use '*' for all streams, or None for selected streams."
},
"limit": {
"default": 10,
"description": "The maximum number of sample records to return per stream.",
"type": "integer"
},
"override_execution_mode": {
"default": "auto",
"description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
"enum": [
"docker",
"python",
"yaml",
"auto"
],
"type": "string"
},
"manifest_path": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a local YAML manifest file for declarative connectors."
}
},
"required": [
"source_name"
],
"type": "object"
}
Show output JSON schema
{
"additionalProperties": {
"anyOf": [
{
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
},
{
"type": "string"
}
]
},
"type": "object"
}
list_cached_streams
Hints: read-only · idempotent
List all streams available in the default DuckDB cache.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
Parameters
_No parameters._
Show input JSON schema
{
"additionalProperties": false,
"properties": {},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"description": "Class to hold information about a cached dataset.",
"properties": {
"stream_name": {
"type": "string"
},
"table_name": {
"type": "string"
},
"schema_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
}
},
"required": [
"stream_name",
"table_name"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
list_connector_config_secrets
Hints: read-only · idempotent
List all config_secret_name options that are known for the given connector.
This can be used to find out which already-created config secret names are available for a given connector. The return value is a list of secret names, but it will not return the actual secret values.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connector_name |
string |
yes | — | The name of the connector. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connector_name": {
"description": "The name of the connector.",
"type": "string"
}
},
"required": [
"connector_name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"type": "string"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
list_dotenv_secrets
Hints: read-only · idempotent
List all environment variable names declared within declared .env files.
This returns a dictionary mapping the .env file name to a list of environment
variable names. The values of the environment variables are not returned.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
Parameters
_No parameters._
Show input JSON schema
{
"additionalProperties": false,
"properties": {},
"type": "object"
}
Show output JSON schema
{
"additionalProperties": {
"items": {
"type": "string"
},
"type": "array"
},
"type": "object"
}
list_source_streams
Hints: read-only · idempotent
List all streams available in a source connector.
This operation (generally) requires a valid configuration, including any required secrets.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_connector_name |
string |
yes | — | The name of the source connector. |
config |
object | string | null |
no | null |
The configuration for the source connector as a dict or JSON string. |
config_file |
string | string | null |
no | null |
Path to a YAML or JSON file containing the source connector config. |
config_secret_name |
string | null |
no | null |
The name of the secret containing the configuration. |
override_execution_mode |
enum("docker", "python", "yaml", "auto") |
no | "auto" |
Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used). |
manifest_path |
string | string | null |
no | null |
Path to a local YAML manifest file for declarative connectors. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_connector_name": {
"description": "The name of the source connector.",
"type": "string"
},
"config": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The configuration for the source connector as a dict or JSON string."
},
"config_file": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a YAML or JSON file containing the source connector config."
},
"config_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the secret containing the configuration."
},
"override_execution_mode": {
"default": "auto",
"description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
"enum": [
"docker",
"python",
"yaml",
"auto"
],
"type": "string"
},
"manifest_path": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a local YAML manifest file for declarative connectors."
}
},
"required": [
"source_connector_name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"type": "string"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
read_source_stream_records
Hints: read-only
Get records from a source connector.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_connector_name |
string |
yes | — | The name of the source connector. |
config |
object | string | null |
no | null |
The configuration for the source connector as a dict or JSON string. |
config_file |
string | string | null |
no | null |
Path to a YAML or JSON file containing the source connector config. |
config_secret_name |
string | null |
no | null |
The name of the secret containing the configuration. |
stream_name |
string |
yes | — | The name of the stream to read records from. |
max_records |
integer |
no | 1000 |
The maximum number of records to read. |
override_execution_mode |
enum("docker", "python", "yaml", "auto") |
no | "auto" |
Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used). |
manifest_path |
string | string | null |
no | null |
Path to a local YAML manifest file for declarative connectors. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_connector_name": {
"description": "The name of the source connector.",
"type": "string"
},
"config": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The configuration for the source connector as a dict or JSON string."
},
"config_file": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a YAML or JSON file containing the source connector config."
},
"config_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the secret containing the configuration."
},
"stream_name": {
"description": "The name of the stream to read records from.",
"type": "string"
},
"max_records": {
"default": 1000,
"description": "The maximum number of records to read.",
"type": "integer"
},
"override_execution_mode": {
"default": "auto",
"description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
"enum": [
"docker",
"python",
"yaml",
"auto"
],
"type": "string"
},
"manifest_path": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a local YAML manifest file for declarative connectors."
}
},
"required": [
"source_connector_name",
"stream_name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"anyOf": [
{
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
},
{
"type": "string"
}
]
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
run_sql_query
Hints: read-only · idempotent
Run a SQL query against the default cache.
The dialect of SQL should match the dialect of the default cache.
Use `describe_default_cache` to see the cache type.
For DuckDB-type caches:
- Use `SHOW TABLES` to list all tables.
- Use `DESCRIBE <table_name>` to get the schema of a specific table
For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
sql_query |
string |
yes | — | The SQL query to execute. |
max_records |
integer |
no | 1000 |
Maximum number of records to return. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"sql_query": {
"description": "The SQL query to execute.",
"type": "string"
},
"max_records": {
"default": 1000,
"description": "Maximum number of records to return.",
"type": "integer"
}
},
"required": [
"sql_query"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
sync_source_to_cache
Run a sync from a source connector to the default DuckDB cache.
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_connector_name |
string |
yes | — | The name of the source connector. |
config |
object | string | null |
no | null |
The configuration for the source connector as a dict or JSON string. |
config_file |
string | string | null |
no | null |
Path to a YAML or JSON file containing the source connector config. |
config_secret_name |
string | null |
no | null |
The name of the secret containing the configuration. |
streams |
array<string> | string |
no | "suggested" |
The streams to sync. |
override_execution_mode |
enum("docker", "python", "yaml", "auto") |
no | "auto" |
Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used). |
manifest_path |
string | string | null |
no | null |
Path to a local YAML manifest file for declarative connectors. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_connector_name": {
"description": "The name of the source connector.",
"type": "string"
},
"config": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The configuration for the source connector as a dict or JSON string."
},
"config_file": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a YAML or JSON file containing the source connector config."
},
"config_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the secret containing the configuration."
},
"streams": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "string"
}
],
"default": "suggested",
"description": "The streams to sync."
},
"override_execution_mode": {
"default": "auto",
"description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
"enum": [
"docker",
"python",
"yaml",
"auto"
],
"type": "string"
},
"manifest_path": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a local YAML manifest file for declarative connectors."
}
},
"required": [
"source_connector_name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
validate_connector_config
Hints: read-only · idempotent
Validate a connector configuration.
Returns a tuple of (is_valid: bool, message: str).
You can provide config as JSON or a Path to a YAML/JSON file.
If a dict is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
and the config should reference them using the format
secret_reference::ENV_VAR_NAME.
You can also provide a config_secret_name to use a specific
secret name for the configuration. This is useful if you want to
validate a configuration that is stored in a secrets manager.
If config_secret_name is provided, it should point to a string
that contains valid JSON or YAML.
If both config and config_secret_name are provided, the
config will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.
For declarative connectors, you can provide a manifest_path to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connector_name |
string |
yes | — | The name of the connector to validate. |
config |
object | string | null |
no | null |
The configuration for the connector as a dict object or JSON string. |
config_file |
string | string | null |
no | null |
Path to a YAML or JSON file containing the connector configuration. |
config_secret_name |
string | null |
no | null |
The name of the secret containing the configuration. |
override_execution_mode |
enum("docker", "python", "yaml", "auto") |
no | "auto" |
Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used). |
manifest_path |
string | string | null |
no | null |
Path to a local YAML manifest file for declarative connectors. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connector_name": {
"description": "The name of the connector to validate.",
"type": "string"
},
"config": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The configuration for the connector as a dict object or JSON string."
},
"config_file": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a YAML or JSON file containing the connector configuration."
},
"config_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the secret containing the configuration."
},
"override_execution_mode": {
"default": "auto",
"description": "Optionally override the execution method to use for the connector. This parameter is ignored if manifest_path is provided (yaml mode will be used).",
"enum": [
"docker",
"python",
"yaml",
"auto"
],
"type": "string"
},
"manifest_path": {
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to a local YAML manifest file for declarative connectors."
}
},
"required": [
"connector_name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"maxItems": 2,
"minItems": 2,
"prefixItems": [
{
"type": "boolean"
},
{
"type": "string"
}
],
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Local MCP operations. 3 4.. include:: ../../docs/mcp-generated/local.md 5""" 6 7# No public Python API — MCP primitives are registered via decorators and 8# documented via the generated Markdown include above. Setting `__all__` to an 9# empty list tells pdoc (and other doc tools) not to surface the individual 10# tool / helper definitions as a redundant "API Documentation" list. 11__all__: list[str] = [] 12 13import sys 14import traceback 15from itertools import islice 16from pathlib import Path 17from typing import TYPE_CHECKING, Annotated, Any, Literal 18 19from fastmcp import FastMCP 20from fastmcp_extensions import mcp_tool, register_mcp_tools 21from pydantic import BaseModel, Field 22 23from airbyte import get_source 24from airbyte._util.destination_smoke_tests import ( 25 DestinationSmokeTestResult, 26 run_destination_smoke_test, 27) 28from airbyte._util.meta import is_docker_installed 29from airbyte.caches.util import get_default_cache 30from airbyte.destinations.util import get_destination 31from airbyte.mcp._arg_resolvers import resolve_connector_config, resolve_list_of_strings 32from airbyte.registry import get_connector_metadata 33from airbyte.secrets.config import _get_secret_sources 34from airbyte.secrets.env_vars import DotenvSecretManager 35from airbyte.secrets.google_gsm import GoogleGSMSecretManager 36from airbyte.sources.base import Source 37 38 39if TYPE_CHECKING: 40 from airbyte.caches.duckdb import DuckDBCache 41 42 43_CONFIG_HELP = """ 44You can provide `config` as JSON or a Path to a YAML/JSON file. 45If a `dict` is provided, it must not contain hardcoded secrets. 46Instead, secrets should be provided using environment variables, 47and the config should reference them using the format 48`secret_reference::ENV_VAR_NAME`. 49 50You can also provide a `config_secret_name` to use a specific 51secret name for the configuration. This is useful if you want to 52validate a configuration that is stored in a secrets manager. 53 54If `config_secret_name` is provided, it should point to a string 55that contains valid JSON or YAML. 56 57If both `config` and `config_secret_name` are provided, the 58`config` will be loaded first and then the referenced secret config 59will be layered on top of the non-secret config. 60 61For declarative connectors, you can provide a `manifest_path` to 62specify a local YAML manifest file instead of using the registry 63version. This is useful for testing custom or locally-developed 64connector manifests. 65""" 66 67 68def _get_mcp_source( 69 connector_name: str, 70 override_execution_mode: Literal["auto", "docker", "python", "yaml"] = "auto", 71 *, 72 install_if_missing: bool = True, 73 manifest_path: str | Path | None, 74) -> Source: 75 """Get the MCP source for a connector.""" 76 if manifest_path: 77 override_execution_mode = "yaml" 78 elif override_execution_mode == "auto" and is_docker_installed(): 79 override_execution_mode = "docker" 80 81 source: Source 82 if override_execution_mode == "auto": 83 # Use defaults with no overrides 84 source = get_source( 85 connector_name, 86 install_if_missing=False, 87 source_manifest=manifest_path or None, 88 ) 89 elif override_execution_mode == "python": 90 source = get_source( 91 connector_name, 92 use_python=True, 93 install_if_missing=False, 94 source_manifest=manifest_path or None, 95 ) 96 elif override_execution_mode == "docker": 97 source = get_source( 98 connector_name, 99 docker_image=True, 100 install_if_missing=False, 101 source_manifest=manifest_path or None, 102 ) 103 elif override_execution_mode == "yaml": 104 source = get_source( 105 connector_name, 106 source_manifest=manifest_path or True, 107 install_if_missing=False, 108 ) 109 else: 110 raise ValueError( 111 f"Unknown execution method: {override_execution_mode}. " 112 "Expected one of: ['auto', 'docker', 'python', 'yaml']." 113 ) 114 115 # Ensure installed: 116 if install_if_missing: 117 source.executor.ensure_installation() 118 119 return source 120 121 122@mcp_tool( 123 read_only=True, 124 idempotent=True, 125 requires_client_filesystem=True, 126 extra_help_text=_CONFIG_HELP, 127) 128def validate_connector_config( 129 connector_name: Annotated[ 130 str, 131 Field(description="The name of the connector to validate."), 132 ], 133 config: Annotated[ 134 dict | str | None, 135 Field( 136 description="The configuration for the connector as a dict object or JSON string.", 137 default=None, 138 ), 139 ], 140 config_file: Annotated[ 141 str | Path | None, 142 Field( 143 description="Path to a YAML or JSON file containing the connector configuration.", 144 default=None, 145 ), 146 ], 147 config_secret_name: Annotated[ 148 str | None, 149 Field( 150 description="The name of the secret containing the configuration.", 151 default=None, 152 ), 153 ], 154 override_execution_mode: Annotated[ 155 Literal["docker", "python", "yaml", "auto"], 156 Field( 157 description="Optionally override the execution method to use for the connector. " 158 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 159 default="auto", 160 ), 161 ], 162 manifest_path: Annotated[ 163 str | Path | None, 164 Field( 165 description="Path to a local YAML manifest file for declarative connectors.", 166 default=None, 167 ), 168 ], 169) -> tuple[bool, str]: 170 """Validate a connector configuration. 171 172 Returns a tuple of (is_valid: bool, message: str). 173 """ 174 try: 175 source: Source = _get_mcp_source( 176 connector_name, 177 override_execution_mode=override_execution_mode, 178 manifest_path=manifest_path, 179 ) 180 except Exception as ex: 181 return False, f"Failed to get connector '{connector_name}': {ex}" 182 183 try: 184 config_dict = resolve_connector_config( 185 config=config, 186 config_file=config_file, 187 config_secret_name=config_secret_name, 188 config_spec_jsonschema=source.config_spec, 189 ) 190 source.set_config(config_dict) 191 except Exception as ex: 192 return False, f"Failed to resolve configuration for {connector_name}: {ex}" 193 194 try: 195 source.check() 196 except Exception as ex: 197 return False, f"Configuration for {connector_name} is invalid: {ex}" 198 199 return True, f"Configuration for {connector_name} is valid!" 200 201 202@mcp_tool( 203 read_only=True, 204 idempotent=True, 205 requires_client_filesystem=True, 206) 207def list_connector_config_secrets( 208 connector_name: Annotated[ 209 str, 210 Field(description="The name of the connector."), 211 ], 212) -> list[str]: 213 """List all `config_secret_name` options that are known for the given connector. 214 215 This can be used to find out which already-created config secret names are available 216 for a given connector. The return value is a list of secret names, but it will not 217 return the actual secret values. 218 """ 219 secrets_names: list[str] = [] 220 for secrets_mgr in _get_secret_sources(): 221 if isinstance(secrets_mgr, GoogleGSMSecretManager): 222 secrets_names.extend( 223 [ 224 secret_handle.secret_name.split("/")[-1] 225 for secret_handle in secrets_mgr.fetch_connector_secrets(connector_name) 226 ] 227 ) 228 229 return secrets_names 230 231 232@mcp_tool( 233 read_only=True, 234 idempotent=True, 235 requires_client_filesystem=True, 236 extra_help_text=_CONFIG_HELP, 237) 238def list_dotenv_secrets() -> dict[str, list[str]]: 239 """List all environment variable names declared within declared .env files. 240 241 This returns a dictionary mapping the .env file name to a list of environment 242 variable names. The values of the environment variables are not returned. 243 """ 244 result: dict[str, list[str]] = {} 245 for secrets_mgr in _get_secret_sources(): 246 if isinstance(secrets_mgr, DotenvSecretManager) and secrets_mgr.dotenv_path: 247 result[str(secrets_mgr.dotenv_path.resolve())] = secrets_mgr.list_secrets_names() 248 249 return result 250 251 252@mcp_tool( 253 read_only=True, 254 idempotent=True, 255 requires_client_filesystem=True, 256 extra_help_text=_CONFIG_HELP, 257) 258def list_source_streams( 259 source_connector_name: Annotated[ 260 str, 261 Field(description="The name of the source connector."), 262 ], 263 config: Annotated[ 264 dict | str | None, 265 Field( 266 description="The configuration for the source connector as a dict or JSON string.", 267 default=None, 268 ), 269 ], 270 config_file: Annotated[ 271 str | Path | None, 272 Field( 273 description="Path to a YAML or JSON file containing the source connector config.", 274 default=None, 275 ), 276 ], 277 config_secret_name: Annotated[ 278 str | None, 279 Field( 280 description="The name of the secret containing the configuration.", 281 default=None, 282 ), 283 ], 284 override_execution_mode: Annotated[ 285 Literal["docker", "python", "yaml", "auto"], 286 Field( 287 description="Optionally override the execution method to use for the connector. " 288 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 289 default="auto", 290 ), 291 ], 292 manifest_path: Annotated[ 293 str | Path | None, 294 Field( 295 description="Path to a local YAML manifest file for declarative connectors.", 296 default=None, 297 ), 298 ], 299) -> list[str]: 300 """List all streams available in a source connector. 301 302 This operation (generally) requires a valid configuration, including any required secrets. 303 """ 304 source: Source = _get_mcp_source( 305 connector_name=source_connector_name, 306 override_execution_mode=override_execution_mode, 307 manifest_path=manifest_path, 308 ) 309 config_dict = resolve_connector_config( 310 config=config, 311 config_file=config_file, 312 config_secret_name=config_secret_name, 313 config_spec_jsonschema=source.config_spec, 314 ) 315 source.set_config(config_dict) 316 return source.get_available_streams() 317 318 319@mcp_tool( 320 read_only=True, 321 idempotent=True, 322 requires_client_filesystem=True, 323 extra_help_text=_CONFIG_HELP, 324) 325def get_source_stream_json_schema( 326 source_connector_name: Annotated[ 327 str, 328 Field(description="The name of the source connector."), 329 ], 330 stream_name: Annotated[ 331 str, 332 Field(description="The name of the stream."), 333 ], 334 config: Annotated[ 335 dict | str | None, 336 Field( 337 description="The configuration for the source connector as a dict or JSON string.", 338 default=None, 339 ), 340 ], 341 config_file: Annotated[ 342 str | Path | None, 343 Field( 344 description="Path to a YAML or JSON file containing the source connector config.", 345 default=None, 346 ), 347 ], 348 config_secret_name: Annotated[ 349 str | None, 350 Field( 351 description="The name of the secret containing the configuration.", 352 default=None, 353 ), 354 ], 355 override_execution_mode: Annotated[ 356 Literal["docker", "python", "yaml", "auto"], 357 Field( 358 description="Optionally override the execution method to use for the connector. " 359 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 360 default="auto", 361 ), 362 ], 363 manifest_path: Annotated[ 364 str | Path | None, 365 Field( 366 description="Path to a local YAML manifest file for declarative connectors.", 367 default=None, 368 ), 369 ], 370) -> dict[str, Any]: 371 """List all properties for a specific stream in a source connector.""" 372 source: Source = _get_mcp_source( 373 connector_name=source_connector_name, 374 override_execution_mode=override_execution_mode, 375 manifest_path=manifest_path, 376 ) 377 config_dict = resolve_connector_config( 378 config=config, 379 config_file=config_file, 380 config_secret_name=config_secret_name, 381 config_spec_jsonschema=source.config_spec, 382 ) 383 source.set_config(config_dict) 384 return source.get_stream_json_schema(stream_name=stream_name) 385 386 387@mcp_tool( 388 read_only=True, 389 requires_client_filesystem=True, 390 extra_help_text=_CONFIG_HELP, 391) 392def read_source_stream_records( 393 source_connector_name: Annotated[ 394 str, 395 Field(description="The name of the source connector."), 396 ], 397 config: Annotated[ 398 dict | str | None, 399 Field( 400 description="The configuration for the source connector as a dict or JSON string.", 401 default=None, 402 ), 403 ], 404 config_file: Annotated[ 405 str | Path | None, 406 Field( 407 description="Path to a YAML or JSON file containing the source connector config.", 408 default=None, 409 ), 410 ], 411 config_secret_name: Annotated[ 412 str | None, 413 Field( 414 description="The name of the secret containing the configuration.", 415 default=None, 416 ), 417 ], 418 *, 419 stream_name: Annotated[ 420 str, 421 Field(description="The name of the stream to read records from."), 422 ], 423 max_records: Annotated[ 424 int, 425 Field( 426 description="The maximum number of records to read.", 427 default=1000, 428 ), 429 ], 430 override_execution_mode: Annotated[ 431 Literal["docker", "python", "yaml", "auto"], 432 Field( 433 description="Optionally override the execution method to use for the connector. " 434 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 435 default="auto", 436 ), 437 ], 438 manifest_path: Annotated[ 439 str | Path | None, 440 Field( 441 description="Path to a local YAML manifest file for declarative connectors.", 442 default=None, 443 ), 444 ], 445) -> list[dict[str, Any]] | str: 446 """Get records from a source connector.""" 447 try: 448 source: Source = _get_mcp_source( 449 connector_name=source_connector_name, 450 override_execution_mode=override_execution_mode, 451 manifest_path=manifest_path, 452 ) 453 config_dict = resolve_connector_config( 454 config=config, 455 config_file=config_file, 456 config_secret_name=config_secret_name, 457 config_spec_jsonschema=source.config_spec, 458 ) 459 source.set_config(config_dict) 460 # First we get a generator for the records in the specified stream. 461 record_generator = source.get_records(stream_name) 462 # Next we load a limited number of records from the generator into our list. 463 records: list[dict[str, Any]] = list(islice(record_generator, max_records)) 464 465 print(f"Retrieved {len(records)} records from stream '{stream_name}'", sys.stderr) 466 467 except Exception as ex: 468 tb_str = traceback.format_exc() 469 # If any error occurs, we print the error message to stderr and return an empty list. 470 return ( 471 f"Error reading records from source '{source_connector_name}': {ex!r}, {ex!s}\n{tb_str}" 472 ) 473 474 else: 475 return records 476 477 478@mcp_tool( 479 read_only=True, 480 requires_client_filesystem=True, 481 extra_help_text=_CONFIG_HELP, 482) 483def get_stream_previews( 484 source_name: Annotated[ 485 str, 486 Field(description="The name of the source connector."), 487 ], 488 config: Annotated[ 489 dict | str | None, 490 Field( 491 description="The configuration for the source connector as a dict or JSON string.", 492 default=None, 493 ), 494 ], 495 config_file: Annotated[ 496 str | Path | None, 497 Field( 498 description="Path to a YAML or JSON file containing the source connector config.", 499 default=None, 500 ), 501 ], 502 config_secret_name: Annotated[ 503 str | None, 504 Field( 505 description="The name of the secret containing the configuration.", 506 default=None, 507 ), 508 ], 509 streams: Annotated[ 510 list[str] | str | None, 511 Field( 512 description=( 513 "The streams to get previews for. " 514 "Use '*' for all streams, or None for selected streams." 515 ), 516 default=None, 517 ), 518 ], 519 limit: Annotated[ 520 int, 521 Field( 522 description="The maximum number of sample records to return per stream.", 523 default=10, 524 ), 525 ], 526 override_execution_mode: Annotated[ 527 Literal["docker", "python", "yaml", "auto"], 528 Field( 529 description="Optionally override the execution method to use for the connector. " 530 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 531 default="auto", 532 ), 533 ], 534 manifest_path: Annotated[ 535 str | Path | None, 536 Field( 537 description="Path to a local YAML manifest file for declarative connectors.", 538 default=None, 539 ), 540 ], 541) -> dict[str, list[dict[str, Any]] | str]: 542 """Get sample records (previews) from streams in a source connector. 543 544 This operation requires a valid configuration, including any required secrets. 545 Returns a dictionary mapping stream names to lists of sample records, or an error 546 message string if an error occurred for that stream. 547 """ 548 source: Source = _get_mcp_source( 549 connector_name=source_name, 550 override_execution_mode=override_execution_mode, 551 manifest_path=manifest_path, 552 ) 553 554 config_dict = resolve_connector_config( 555 config=config, 556 config_file=config_file, 557 config_secret_name=config_secret_name, 558 config_spec_jsonschema=source.config_spec, 559 ) 560 source.set_config(config_dict) 561 562 streams_param: list[str] | Literal["*"] | None = resolve_list_of_strings( 563 streams 564 ) # pyrefly: ignore[no-matching-overload] 565 if streams_param and len(streams_param) == 1 and streams_param[0] == "*": 566 streams_param = "*" 567 568 try: 569 samples_result = source.get_samples( 570 streams=streams_param, 571 limit=limit, 572 on_error="ignore", 573 ) 574 except Exception as ex: 575 tb_str = traceback.format_exc() 576 return { 577 "ERROR": f"Error getting stream previews from source '{source_name}': " 578 f"{ex!r}, {ex!s}\n{tb_str}" 579 } 580 581 result: dict[str, list[dict[str, Any]] | str] = {} 582 for stream_name, dataset in samples_result.items(): 583 if dataset is None: 584 result[stream_name] = f"Could not retrieve stream samples for stream '{stream_name}'" 585 else: 586 result[stream_name] = list(dataset) 587 588 return result 589 590 591@mcp_tool( 592 destructive=False, 593 requires_client_filesystem=True, 594 extra_help_text=_CONFIG_HELP, 595) 596def sync_source_to_cache( 597 source_connector_name: Annotated[ 598 str, 599 Field(description="The name of the source connector."), 600 ], 601 config: Annotated[ 602 dict | str | None, 603 Field( 604 description="The configuration for the source connector as a dict or JSON string.", 605 default=None, 606 ), 607 ], 608 config_file: Annotated[ 609 str | Path | None, 610 Field( 611 description="Path to a YAML or JSON file containing the source connector config.", 612 default=None, 613 ), 614 ], 615 config_secret_name: Annotated[ 616 str | None, 617 Field( 618 description="The name of the secret containing the configuration.", 619 default=None, 620 ), 621 ], 622 streams: Annotated[ 623 list[str] | str, 624 Field( 625 description="The streams to sync.", 626 default="suggested", 627 ), 628 ], 629 override_execution_mode: Annotated[ 630 Literal["docker", "python", "yaml", "auto"], 631 Field( 632 description="Optionally override the execution method to use for the connector. " 633 "This parameter is ignored if manifest_path is provided (yaml mode will be used).", 634 default="auto", 635 ), 636 ], 637 manifest_path: Annotated[ 638 str | Path | None, 639 Field( 640 description="Path to a local YAML manifest file for declarative connectors.", 641 default=None, 642 ), 643 ], 644) -> str: 645 """Run a sync from a source connector to the default DuckDB cache.""" 646 source: Source = _get_mcp_source( 647 connector_name=source_connector_name, 648 override_execution_mode=override_execution_mode, 649 manifest_path=manifest_path, 650 ) 651 config_dict = resolve_connector_config( 652 config=config, 653 config_file=config_file, 654 config_secret_name=config_secret_name, 655 config_spec_jsonschema=source.config_spec, 656 ) 657 source.set_config(config_dict) 658 cache = get_default_cache() 659 660 streams = resolve_list_of_strings(streams) 661 if streams and len(streams) == 1 and streams[0] in {"*", "suggested"}: 662 # Float '*' and 'suggested' to the top-level for special processing: 663 streams = streams[0] 664 665 if isinstance(streams, str) and streams == "suggested": 666 streams = "*" # Default to all streams if 'suggested' is not otherwise specified. 667 try: 668 metadata = get_connector_metadata( 669 source_connector_name, 670 ) 671 except Exception: 672 streams = "*" # Fallback to all streams if suggested streams fail. 673 else: 674 if metadata is not None: 675 streams = metadata.suggested_streams or "*" 676 677 if isinstance(streams, str) and streams != "*": 678 streams = [streams] # Ensure streams is a list 679 680 source.read( 681 cache=cache, 682 streams=streams, 683 ) 684 del cache # Ensure the cache is closed properly 685 686 summary: str = f"Sync completed for '{source_connector_name}'!\n\n" 687 summary += "Data written to default DuckDB cache\n" 688 return summary 689 690 691class CachedDatasetInfo(BaseModel): 692 """Class to hold information about a cached dataset.""" 693 694 stream_name: str 695 """The name of the stream in the cache.""" 696 table_name: str 697 schema_name: str | None = None 698 699 700@mcp_tool( 701 read_only=True, 702 idempotent=True, 703 requires_client_filesystem=True, 704 extra_help_text=_CONFIG_HELP, 705) 706def list_cached_streams() -> list[CachedDatasetInfo]: 707 """List all streams available in the default DuckDB cache.""" 708 cache: DuckDBCache = get_default_cache() 709 result = [ 710 CachedDatasetInfo( 711 stream_name=stream_name, 712 table_name=(cache.table_prefix or "") + stream_name, 713 schema_name=cache.schema_name, 714 ) 715 for stream_name in cache.streams 716 ] 717 del cache # Ensure the cache is closed properly 718 return result 719 720 721@mcp_tool( 722 read_only=True, 723 idempotent=True, 724 requires_client_filesystem=True, 725 extra_help_text=_CONFIG_HELP, 726) 727def describe_default_cache() -> dict[str, Any]: 728 """Describe the currently configured default cache.""" 729 cache = get_default_cache() 730 return { 731 "cache_type": type(cache).__name__, 732 "cache_dir": str(cache.cache_dir), 733 "cache_db_path": str(Path(cache.db_path).absolute()), 734 "cached_streams": list(cache.streams.keys()), 735 } 736 737 738def _is_safe_sql(sql_query: str) -> bool: 739 """Check if a SQL query is safe to execute. 740 741 For security reasons, we only allow read-only operations like SELECT, DESCRIBE, and SHOW. 742 Multi-statement queries (containing semicolons) are also disallowed for security. 743 744 Note: SQLAlchemy will also validate downstream, but this is a first-pass check. 745 746 Args: 747 sql_query: The SQL query to check 748 749 Returns: 750 True if the query is safe to execute, False otherwise 751 """ 752 # Remove leading/trailing whitespace and convert to uppercase for checking 753 normalized_query = sql_query.strip().upper() 754 755 # Disallow multi-statement queries (containing semicolons) 756 # Note: We check the original query to catch semicolons anywhere, including in comments 757 if ";" in sql_query: 758 return False 759 760 # List of allowed SQL statement prefixes (read-only operations) 761 allowed_prefixes = ( 762 "SELECT", 763 "DESCRIBE", 764 "DESC", # Short form of DESCRIBE 765 "SHOW", 766 "EXPLAIN", # Also safe - shows query execution plan 767 ) 768 769 # Check if the query starts with any allowed prefix 770 return any(normalized_query.startswith(prefix) for prefix in allowed_prefixes) 771 772 773@mcp_tool( 774 read_only=True, 775 idempotent=True, 776 requires_client_filesystem=True, 777 extra_help_text=_CONFIG_HELP, 778) 779def run_sql_query( 780 sql_query: Annotated[ 781 str, 782 Field(description="The SQL query to execute."), 783 ], 784 max_records: Annotated[ 785 int, 786 Field( 787 description="Maximum number of records to return.", 788 default=1000, 789 ), 790 ], 791) -> list[dict[str, Any]]: 792 """Run a SQL query against the default cache. 793 794 The dialect of SQL should match the dialect of the default cache. 795 Use `describe_default_cache` to see the cache type. 796 797 For DuckDB-type caches: 798 - Use `SHOW TABLES` to list all tables. 799 - Use `DESCRIBE <table_name>` to get the schema of a specific table 800 801 For security reasons, only read-only operations are allowed: SELECT, DESCRIBE, SHOW, EXPLAIN. 802 """ 803 # Check if the query is safe to execute 804 if not _is_safe_sql(sql_query): 805 return [ 806 { 807 "ERROR": "Unsafe SQL query detected. Only read-only operations are allowed: " 808 "SELECT, DESCRIBE, SHOW, EXPLAIN", 809 "SQL_QUERY": sql_query, 810 } 811 ] 812 813 cache: DuckDBCache = get_default_cache() 814 try: 815 return cache.run_sql_query( 816 sql_query, 817 max_records=max_records, 818 ) 819 except Exception as ex: 820 tb_str = traceback.format_exc() 821 return [ 822 { 823 "ERROR": f"Error running SQL query: {ex!r}, {ex!s}", 824 "TRACEBACK": tb_str, 825 "SQL_QUERY": sql_query, 826 } 827 ] 828 finally: 829 del cache # Ensure the cache is closed properly 830 831 832@mcp_tool( 833 destructive=True, 834 requires_client_filesystem=True, 835) 836def destination_smoke_test( # noqa: PLR0913, PLR0917 837 destination_connector_name: Annotated[ 838 str, 839 Field( 840 description=( 841 "The name of the destination connector to test " 842 "(e.g. 'destination-snowflake', 'destination-motherduck')." 843 ), 844 ), 845 ], 846 config: Annotated[ 847 dict | str | None, 848 Field( 849 description=( 850 "The destination configuration as a dict object or JSON string. " 851 "Must not contain hardcoded secrets; use secret_reference::ENV_VAR_NAME instead." 852 ), 853 default=None, 854 ), 855 ], 856 config_file: Annotated[ 857 str | Path | None, 858 Field( 859 description="Path to a YAML or JSON file containing the destination configuration.", 860 default=None, 861 ), 862 ], 863 config_secret_name: Annotated[ 864 str | None, 865 Field( 866 description="The name of the secret containing the destination configuration.", 867 default=None, 868 ), 869 ], 870 scenarios: Annotated[ 871 list[str] | str, 872 Field( 873 description=( 874 "Which scenarios to run. Use 'fast' (default) for all fast predefined " 875 "scenarios (excludes large_batch_stream), 'all' for every predefined " 876 "scenario including large batch, or provide a list of scenario names " 877 "or a comma-separated string." 878 ), 879 default="fast", 880 ), 881 ], 882 custom_scenarios: Annotated[ 883 list[dict[str, Any]] | None, 884 Field( 885 description=( 886 "Additional custom test scenarios to inject. Each scenario should define " 887 "'name', 'json_schema', and optionally 'records' and 'primary_key'. " 888 "These are unioned with the predefined scenarios." 889 ), 890 default=None, 891 ), 892 ], 893 docker_image: Annotated[ 894 str | None, 895 Field( 896 description=( 897 "Optional Docker image override for the destination connector " 898 "(e.g. 'airbyte/destination-snowflake:3.14.0')." 899 ), 900 default=None, 901 ), 902 ], 903 namespace_suffix: Annotated[ 904 str | None, 905 Field( 906 description=( 907 "Optional suffix appended to the auto-generated namespace. " 908 "Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). " 909 "Use this to distinguish concurrent runs." 910 ), 911 default=None, 912 ), 913 ], 914 reuse_namespace: Annotated[ 915 str | None, 916 Field( 917 description=( 918 "Exact namespace to reuse from a previous run. " 919 "When set, no new namespace is generated. " 920 "Useful for running a second test against an already-populated namespace." 921 ), 922 default=None, 923 ), 924 ], 925 skip_preflight: Annotated[ 926 bool, 927 Field( 928 description=( 929 "Skip the automatic preflight check that runs basic_types before " 930 "the requested scenarios. Set to true when you expect basic_types " 931 "itself to fail or want to save time on repeated runs." 932 ), 933 default=False, 934 ), 935 ], 936) -> DestinationSmokeTestResult: 937 """Run smoke tests against a destination connector. 938 939 Sends synthetic test data from the smoke test source to the specified 940 destination and reports success or failure. The smoke test source generates 941 data across predefined scenarios covering common destination failure patterns: 942 type variations, null handling, naming edge cases, schema variations, and 943 batch sizes. 944 945 When the destination has a compatible cache implementation (DuckDB, 946 Postgres, Snowflake, BigQuery, MotherDuck), readback introspection is 947 automatically performed after a successful write. The readback produces 948 stats on the written data: table row counts, column names/types, and 949 per-column null/non-null counts. Results are included in the response 950 as `table_statistics` and `tables_not_found`. 951 """ 952 # Resolve destination config 953 config_dict = resolve_connector_config( 954 config=config, 955 config_file=config_file, 956 config_secret_name=config_secret_name, 957 ) 958 959 # Set up destination 960 destination_kwargs: dict[str, Any] = { 961 "name": destination_connector_name, 962 "config": config_dict, 963 } 964 if docker_image: 965 destination_kwargs["docker_image"] = docker_image 966 elif is_docker_installed(): 967 destination_kwargs["docker_image"] = True 968 969 destination_obj = get_destination(**destination_kwargs) 970 971 # Resolve scenarios for the shared helper 972 resolved_scenarios: str | list[str] 973 if isinstance(scenarios, str): 974 resolved_scenarios = scenarios 975 else: 976 resolved_scenarios = resolve_list_of_strings(scenarios) or "fast" 977 978 return run_destination_smoke_test( 979 destination=destination_obj, 980 scenarios=resolved_scenarios, 981 namespace_suffix=namespace_suffix, 982 reuse_namespace=reuse_namespace, 983 custom_scenarios=custom_scenarios, 984 skip_preflight=skip_preflight, 985 ) 986 987 988def register_local_tools(app: FastMCP) -> None: 989 """Register local tools with the FastMCP app. 990 991 Args: 992 app: FastMCP application instance 993 """ 994 register_mcp_tools(app, mcp_module=__name__)