airbyte_cdk.test.standard_tests

FAST Airbyte Standard Tests

This module provides a set of base classes for declarative connector test suites. The goal of this module is to provide a robust and extensible framework for testing Airbyte connectors.

Example usage:

# `test_airbyte_standards.py`
from airbyte_cdk.test import standard_tests

pytest_plugins = [
    "airbyte_cdk.test.standard_tests.pytest_hooks",
]


class TestSuiteSourcePokeAPI(standard_tests.DeclarativeSourceTestSuite):
    """Test suite for the source."""

Available test suites base classes:

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2'''FAST Airbyte Standard Tests
 3
 4This module provides a set of base classes for declarative connector test suites.
 5The goal of this module is to provide a robust and extensible framework for testing Airbyte
 6connectors.
 7
 8Example usage:
 9
10```python
11# `test_airbyte_standards.py`
12from airbyte_cdk.test import standard_tests
13
14pytest_plugins = [
15    "airbyte_cdk.test.standard_tests.pytest_hooks",
16]
17
18
19class TestSuiteSourcePokeAPI(standard_tests.DeclarativeSourceTestSuite):
20    """Test suite for the source."""
21```
22
23Available test suites base classes:
24- `DeclarativeSourceTestSuite`: A test suite for declarative sources.
25- `SourceTestSuiteBase`: A test suite for sources.
26- `DestinationTestSuiteBase`: A test suite for destinations.
27
28'''
29
30from airbyte_cdk.test.models.scenario import ConnectorTestScenario
31from airbyte_cdk.test.standard_tests.connector_base import ConnectorTestSuiteBase
32from airbyte_cdk.test.standard_tests.declarative_sources import (
33    DeclarativeSourceTestSuite,
34)
35from airbyte_cdk.test.standard_tests.destination_base import DestinationTestSuiteBase
36from airbyte_cdk.test.standard_tests.source_base import SourceTestSuiteBase
37
38__all__ = [
39    "ConnectorTestScenario",
40    "ConnectorTestSuiteBase",
41    "DeclarativeSourceTestSuite",
42    "DestinationTestSuiteBase",
43    "SourceTestSuiteBase",
44]
class ConnectorTestScenario(pydantic.main.BaseModel):
 28class ConnectorTestScenario(BaseModel):
 29    """Acceptance test scenario, as a Pydantic model.
 30
 31    This class represents an acceptance test scenario, which is a single test case
 32    that can be run against a connector. It is used to deserialize and validate the
 33    acceptance test configuration file.
 34    """
 35
 36    # Allows the class to be hashable, which PyTest will require
 37    # when we use to parameterize tests.
 38    model_config = ConfigDict(frozen=True)
 39
 40    class AcceptanceTestExpectRecords(BaseModel):
 41        path: Path
 42        exact_order: bool = False
 43
 44    class AcceptanceTestFileTypes(BaseModel):
 45        skip_test: bool
 46        bypass_reason: str
 47
 48    class AcceptanceTestEmptyStream(BaseModel):
 49        name: str
 50        bypass_reason: str | None = None
 51
 52        # bypass reason does not affect equality
 53        def __hash__(self) -> int:
 54            return hash(self.name)
 55
 56    config_path: Path | None = None
 57    config_dict: dict[str, Any] | None = None
 58
 59    _id: str | None = None  # Used to override the default ID generation
 60
 61    configured_catalog_path: Path | None = None
 62    empty_streams: list[AcceptanceTestEmptyStream] | None = None
 63    timeout_seconds: int | None = None
 64    expect_records: AcceptanceTestExpectRecords | None = None
 65    file_types: AcceptanceTestFileTypes | None = None
 66    status: Literal["succeed", "failed", "exception"] | None = None
 67
 68    def get_config_dict(
 69        self,
 70        *,
 71        connector_root: Path,
 72        empty_if_missing: bool,
 73    ) -> dict[str, Any]:
 74        """Return the config dictionary.
 75
 76        If a config dictionary has already been loaded, return it. Otherwise, load
 77        the config file and return the dictionary.
 78
 79        If `self.config_dict` and `self.config_path` are both `None`:
 80        - return an empty dictionary if `empty_if_missing` is True
 81        - raise a ValueError if `empty_if_missing` is False
 82        """
 83        if self.config_dict is not None:
 84            return self.config_dict
 85
 86        if self.config_path is not None:
 87            config_path = self.config_path
 88            if not config_path.is_absolute():
 89                # We usually receive a relative path here. Let's resolve it.
 90                config_path = (connector_root / self.config_path).resolve().absolute()
 91
 92            return cast(
 93                dict[str, Any],
 94                yaml.safe_load(config_path.read_text()),
 95            )
 96
 97        if empty_if_missing:
 98            return {}
 99
100        raise ValueError("No config dictionary or path provided.")
101
102    @property
103    def expected_outcome(self) -> ExpectedOutcome:
104        """Whether the test scenario expects an exception to be raised.
105
106        Returns True if the scenario expects an exception, False if it does not,
107        and None if there is no set expectation.
108        """
109        return ExpectedOutcome.from_status_str(self.status)
110
111    @property
112    def id(self) -> str:
113        """Return a unique identifier for the test scenario.
114
115        This is used by PyTest to identify the test scenario.
116        """
117        if self._id:
118            return self._id
119
120        if self.config_path:
121            return self.config_path.stem
122
123        return str(hash(self))
124
125    def __str__(self) -> str:
126        return f"'{self.id}' Test Scenario"
127
128    @contextmanager
129    def with_temp_config_file(
130        self,
131        connector_root: Path,
132    ) -> Generator[Path, None, None]:
133        """Yield a temporary JSON file path containing the config dict and delete it on exit."""
134        config = self.get_config_dict(
135            empty_if_missing=True,
136            connector_root=connector_root,
137        )
138        with tempfile.NamedTemporaryFile(
139            prefix="config-",
140            suffix=".json",
141            mode="w",
142            delete=False,  # Don't fail if cannot delete the file on exit
143            encoding="utf-8",
144        ) as temp_file:
145            temp_file.write(json.dumps(config))
146            temp_file.flush()
147            # Allow the file to be read by other processes
148            temp_path = Path(temp_file.name)
149            temp_path.chmod(temp_path.stat().st_mode | 0o444)
150            yield temp_path
151
152        # attempt cleanup, ignore errors
153        with suppress(OSError):
154            temp_path.unlink()
155
156    def without_expected_outcome(self) -> ConnectorTestScenario:
157        """Return a copy of the scenario that does not expect failure or success.
158
159        This is useful when running multiple steps, to defer the expectations to a later step.
160        """
161        return ConnectorTestScenario(
162            **self.model_dump(exclude={"status"}),
163        )
164
165    def with_expecting_failure(self) -> ConnectorTestScenario:
166        """Return a copy of the scenario that expects failure.
167
168        This is useful when deriving new scenarios from existing ones.
169        """
170        if self.status == "failed":
171            return self
172
173        return ConnectorTestScenario(
174            **self.model_dump(exclude={"status"}),
175            status="failed",
176        )
177
178    def with_expecting_success(self) -> ConnectorTestScenario:
179        """Return a copy of the scenario that expects success.
180
181        This is useful when deriving new scenarios from existing ones.
182        """
183        if self.status == "succeed":
184            return self
185
186        return ConnectorTestScenario(
187            **self.model_dump(exclude={"status"}),
188            status="succeed",
189        )
190
191    @property
192    def requires_creds(self) -> bool:
193        """Return True if the scenario requires credentials to run."""
194        return bool(self.config_path and "secrets" in self.config_path.parts)

Acceptance test scenario, as a Pydantic model.

This class represents an acceptance test scenario, which is a single test case that can be run against a connector. It is used to deserialize and validate the acceptance test configuration file.

model_config = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

config_path: pathlib.Path | None
config_dict: dict[str, typing.Any] | None
configured_catalog_path: pathlib.Path | None
timeout_seconds: int | None
status: Optional[Literal['succeed', 'failed', 'exception']]
def get_config_dict( self, *, connector_root: pathlib.Path, empty_if_missing: bool) -> dict[str, typing.Any]:
 68    def get_config_dict(
 69        self,
 70        *,
 71        connector_root: Path,
 72        empty_if_missing: bool,
 73    ) -> dict[str, Any]:
 74        """Return the config dictionary.
 75
 76        If a config dictionary has already been loaded, return it. Otherwise, load
 77        the config file and return the dictionary.
 78
 79        If `self.config_dict` and `self.config_path` are both `None`:
 80        - return an empty dictionary if `empty_if_missing` is True
 81        - raise a ValueError if `empty_if_missing` is False
 82        """
 83        if self.config_dict is not None:
 84            return self.config_dict
 85
 86        if self.config_path is not None:
 87            config_path = self.config_path
 88            if not config_path.is_absolute():
 89                # We usually receive a relative path here. Let's resolve it.
 90                config_path = (connector_root / self.config_path).resolve().absolute()
 91
 92            return cast(
 93                dict[str, Any],
 94                yaml.safe_load(config_path.read_text()),
 95            )
 96
 97        if empty_if_missing:
 98            return {}
 99
100        raise ValueError("No config dictionary or path provided.")

Return the config dictionary.

If a config dictionary has already been loaded, return it. Otherwise, load the config file and return the dictionary.

If self.config_dict and self.config_path are both None:

  • return an empty dictionary if empty_if_missing is True
  • raise a ValueError if empty_if_missing is False
expected_outcome: airbyte_cdk.test.models.ExpectedOutcome
102    @property
103    def expected_outcome(self) -> ExpectedOutcome:
104        """Whether the test scenario expects an exception to be raised.
105
106        Returns True if the scenario expects an exception, False if it does not,
107        and None if there is no set expectation.
108        """
109        return ExpectedOutcome.from_status_str(self.status)

Whether the test scenario expects an exception to be raised.

Returns True if the scenario expects an exception, False if it does not, and None if there is no set expectation.

id: str
111    @property
112    def id(self) -> str:
113        """Return a unique identifier for the test scenario.
114
115        This is used by PyTest to identify the test scenario.
116        """
117        if self._id:
118            return self._id
119
120        if self.config_path:
121            return self.config_path.stem
122
123        return str(hash(self))

Return a unique identifier for the test scenario.

This is used by PyTest to identify the test scenario.

@contextmanager
def with_temp_config_file( self, connector_root: pathlib.Path) -> Generator[pathlib.Path, None, None]:
128    @contextmanager
129    def with_temp_config_file(
130        self,
131        connector_root: Path,
132    ) -> Generator[Path, None, None]:
133        """Yield a temporary JSON file path containing the config dict and delete it on exit."""
134        config = self.get_config_dict(
135            empty_if_missing=True,
136            connector_root=connector_root,
137        )
138        with tempfile.NamedTemporaryFile(
139            prefix="config-",
140            suffix=".json",
141            mode="w",
142            delete=False,  # Don't fail if cannot delete the file on exit
143            encoding="utf-8",
144        ) as temp_file:
145            temp_file.write(json.dumps(config))
146            temp_file.flush()
147            # Allow the file to be read by other processes
148            temp_path = Path(temp_file.name)
149            temp_path.chmod(temp_path.stat().st_mode | 0o444)
150            yield temp_path
151
152        # attempt cleanup, ignore errors
153        with suppress(OSError):
154            temp_path.unlink()

Yield a temporary JSON file path containing the config dict and delete it on exit.

def without_expected_outcome(self) -> ConnectorTestScenario:
156    def without_expected_outcome(self) -> ConnectorTestScenario:
157        """Return a copy of the scenario that does not expect failure or success.
158
159        This is useful when running multiple steps, to defer the expectations to a later step.
160        """
161        return ConnectorTestScenario(
162            **self.model_dump(exclude={"status"}),
163        )

Return a copy of the scenario that does not expect failure or success.

This is useful when running multiple steps, to defer the expectations to a later step.

def with_expecting_failure(self) -> ConnectorTestScenario:
165    def with_expecting_failure(self) -> ConnectorTestScenario:
166        """Return a copy of the scenario that expects failure.
167
168        This is useful when deriving new scenarios from existing ones.
169        """
170        if self.status == "failed":
171            return self
172
173        return ConnectorTestScenario(
174            **self.model_dump(exclude={"status"}),
175            status="failed",
176        )

Return a copy of the scenario that expects failure.

This is useful when deriving new scenarios from existing ones.

def with_expecting_success(self) -> ConnectorTestScenario:
178    def with_expecting_success(self) -> ConnectorTestScenario:
179        """Return a copy of the scenario that expects success.
180
181        This is useful when deriving new scenarios from existing ones.
182        """
183        if self.status == "succeed":
184            return self
185
186        return ConnectorTestScenario(
187            **self.model_dump(exclude={"status"}),
188            status="succeed",
189        )

Return a copy of the scenario that expects success.

This is useful when deriving new scenarios from existing ones.

requires_creds: bool
191    @property
192    def requires_creds(self) -> bool:
193        """Return True if the scenario requires credentials to run."""
194        return bool(self.config_path and "secrets" in self.config_path.parts)

Return True if the scenario requires credentials to run.

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
328def init_private_attributes(self: BaseModel, context: Any, /) -> None:
329    """This function is meant to behave like a BaseModel method to initialise private attributes.
330
331    It takes context as an argument since that's what pydantic-core passes when calling it.
332
333    Args:
334        self: The BaseModel instance.
335        context: The context.
336    """
337    if getattr(self, '__pydantic_private__', None) is None:
338        pydantic_private = {}
339        for name, private_attr in self.__private_attributes__.items():
340            default = private_attr.get_default()
341            if default is not PydanticUndefined:
342                pydantic_private[name] = default
343        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
class ConnectorTestScenario.AcceptanceTestExpectRecords(pydantic.main.BaseModel):
40    class AcceptanceTestExpectRecords(BaseModel):
41        path: Path
42        exact_order: bool = False

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
path: pathlib.Path
exact_order: bool
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ConnectorTestScenario.AcceptanceTestFileTypes(pydantic.main.BaseModel):
44    class AcceptanceTestFileTypes(BaseModel):
45        skip_test: bool
46        bypass_reason: str

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
skip_test: bool
bypass_reason: str
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ConnectorTestScenario.AcceptanceTestEmptyStream(pydantic.main.BaseModel):
48    class AcceptanceTestEmptyStream(BaseModel):
49        name: str
50        bypass_reason: str | None = None
51
52        # bypass reason does not affect equality
53        def __hash__(self) -> int:
54            return hash(self.name)

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
name: str
bypass_reason: str | None
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

 27class ConnectorTestSuiteBase(DockerConnectorTestSuite):
 28    """Base class for Python connector test suites."""
 29
 30    connector: type[IConnector] | Callable[[], IConnector] | None  # type: ignore [reportRedeclaration]
 31    """The connector class or a factory function that returns an scenario of IConnector."""
 32
 33    @classproperty  # type: ignore [no-redef]
 34    def connector(cls) -> type[IConnector] | Callable[[], IConnector] | None:
 35        """Get the connector class for the test suite.
 36
 37        This assumes a python connector and should be overridden by subclasses to provide the
 38        specific connector class to be tested.
 39        """
 40        connector_root = cls.get_connector_root_dir()
 41        connector_name = cls.connector_name
 42
 43        expected_module_name = connector_name.replace("-", "_").lower()
 44        expected_class_name = connector_name.replace("-", "_").title().replace("_", "")
 45
 46        # dynamically import and get the connector class: <expected_module_name>.<expected_class_name>
 47
 48        cwd_snapshot = Path().absolute()
 49        os.chdir(connector_root)
 50
 51        # Dynamically import the module
 52        try:
 53            module = importlib.import_module(expected_module_name)
 54        except ModuleNotFoundError as e:
 55            raise ImportError(
 56                f"Could not import module '{expected_module_name}'. "
 57                "Please ensure you are running from within the connector's virtual environment, "
 58                "for instance by running `poetry run airbyte-cdk connector test` from the "
 59                "connector directory. If the issue persists, check that the connector "
 60                f"module matches the expected module name '{expected_module_name}' and that the "
 61                f"connector class matches the expected class name '{expected_class_name}'. "
 62                "Alternatively, you can run `airbyte-cdk image test` to run a subset of tests "
 63                "against the connector's image."
 64            ) from e
 65        finally:
 66            # Change back to the original working directory
 67            os.chdir(cwd_snapshot)
 68
 69        # Dynamically get the class from the module
 70        try:
 71            return cast(type[IConnector], getattr(module, expected_class_name))
 72        except AttributeError as e:
 73            # We did not find it based on our expectations, so let's check if we can find it
 74            # with a case-insensitive match.
 75            matching_class_name = next(
 76                (name for name in dir(module) if name.lower() == expected_class_name.lower()),
 77                None,
 78            )
 79            if not matching_class_name:
 80                raise ImportError(
 81                    f"Module '{expected_module_name}' does not have a class named '{expected_class_name}'."
 82                ) from e
 83            return cast(type[IConnector], getattr(module, matching_class_name))
 84
 85    @classmethod
 86    def create_connector(
 87        cls,
 88        scenario: ConnectorTestScenario | None,
 89    ) -> IConnector:
 90        """Instantiate the connector class."""
 91        connector = cls.connector  # type: ignore
 92        if connector:
 93            if callable(connector) or isinstance(connector, type):
 94                # If the connector is a class or factory function, instantiate it:
 95                return cast(IConnector, connector())  # type: ignore [redundant-cast]
 96
 97        # Otherwise, we can't instantiate the connector. Fail with a clear error message.
 98        raise NotImplementedError(
 99            "No connector class or connector factory function provided. "
100            "Please provide a class or factory function in `cls.connector`, or "
101            "override `cls.create_connector()` to define a custom initialization process."
102        )
103
104    # Test Definitions
105
106    def test_check(
107        self,
108        scenario: ConnectorTestScenario,
109    ) -> None:
110        """Run `connection` acceptance tests."""
111        result: entrypoint_wrapper.EntrypointOutput = run_test_job(
112            self.create_connector(scenario),
113            "check",
114            test_scenario=scenario,
115            connector_root=self.get_connector_root_dir(),
116        )
117        assert len(result.connection_status_messages) == 1, (
118            f"Expected exactly one CONNECTION_STATUS message. "
119            "Got: {result.connection_status_messages!s}"
120        )

Base class for Python connector test suites.

def connector(unknown):

The connector class or a factory function that returns an scenario of IConnector.

@classmethod
def create_connector( cls, scenario: ConnectorTestScenario | None) -> airbyte_cdk.test.standard_tests._job_runner.IConnector:
 85    @classmethod
 86    def create_connector(
 87        cls,
 88        scenario: ConnectorTestScenario | None,
 89    ) -> IConnector:
 90        """Instantiate the connector class."""
 91        connector = cls.connector  # type: ignore
 92        if connector:
 93            if callable(connector) or isinstance(connector, type):
 94                # If the connector is a class or factory function, instantiate it:
 95                return cast(IConnector, connector())  # type: ignore [redundant-cast]
 96
 97        # Otherwise, we can't instantiate the connector. Fail with a clear error message.
 98        raise NotImplementedError(
 99            "No connector class or connector factory function provided. "
100            "Please provide a class or factory function in `cls.connector`, or "
101            "override `cls.create_connector()` to define a custom initialization process."
102        )

Instantiate the connector class.

def test_check( self, scenario: ConnectorTestScenario) -> None:
106    def test_check(
107        self,
108        scenario: ConnectorTestScenario,
109    ) -> None:
110        """Run `connection` acceptance tests."""
111        result: entrypoint_wrapper.EntrypointOutput = run_test_job(
112            self.create_connector(scenario),
113            "check",
114            test_scenario=scenario,
115            connector_root=self.get_connector_root_dir(),
116        )
117        assert len(result.connection_status_messages) == 1, (
118            f"Expected exactly one CONNECTION_STATUS message. "
119            "Got: {result.connection_status_messages!s}"
120        )

Run connection acceptance tests.

class DeclarativeSourceTestSuite(airbyte_cdk.test.standard_tests.SourceTestSuiteBase):
 28class DeclarativeSourceTestSuite(SourceTestSuiteBase):
 29    """Declarative source test suite.
 30
 31    This inherits from the Python-based source test suite and implements the
 32    `create_connector` method to create a declarative source object instead of
 33    requiring a custom Python source object.
 34
 35    The class also automatically locates the `manifest.yaml` file and the
 36    `components.py` file (if it exists) in the connector's directory.
 37    """
 38
 39    connector: type[IConnector] | None = None
 40
 41    @classproperty
 42    def manifest_yaml_path(cls) -> Path:
 43        """Get the path to the manifest.yaml file."""
 44        result = cls.get_connector_root_dir() / MANIFEST_YAML
 45        if result.exists():
 46            return result
 47
 48        raise FileNotFoundError(
 49            f"Manifest YAML file not found at {result}. "
 50            "Please ensure that the test suite is run in the correct directory.",
 51        )
 52
 53    @classproperty
 54    def components_py_path(cls) -> Path | None:
 55        """Get the path to the `components.py` file, if one exists.
 56
 57        If not `components.py` file exists, return None.
 58        """
 59        result = cls.get_connector_root_dir() / "components.py"
 60        if result.exists():
 61            return result
 62
 63        return None
 64
 65    @classmethod
 66    def create_connector(
 67        cls,
 68        scenario: ConnectorTestScenario | None,
 69    ) -> IConnector:
 70        """Create a connector scenario for the test suite.
 71
 72        This overrides `create_connector` from the create a declarative source object
 73        instead of requiring a custom python source object.
 74
 75        Subclasses should not need to override this method.
 76        """
 77        scenario = scenario or ConnectorTestScenario()  # Use default (empty) scenario if None
 78        manifest_dict = yaml.safe_load(cls.manifest_yaml_path.read_text())
 79        config = {
 80            "__injected_manifest": manifest_dict,
 81        }
 82        config.update(
 83            scenario.get_config_dict(
 84                empty_if_missing=True,
 85                connector_root=cls.get_connector_root_dir(),
 86            ),
 87        )
 88
 89        if cls.components_py_path and cls.components_py_path.exists():
 90            os.environ["AIRBYTE_ENABLE_UNSAFE_CODE"] = "true"
 91            config["__injected_components_py"] = cls.components_py_path.read_text()
 92            config["__injected_components_py_checksums"] = {
 93                "md5": md5_checksum(cls.components_py_path),
 94            }
 95
 96        return cast(
 97            IConnector,
 98            ConcurrentDeclarativeSource(
 99                config=config,
100                catalog=None,
101                state=None,
102                source_config=manifest_dict,
103            ),
104        )

Declarative source test suite.

This inherits from the Python-based source test suite and implements the create_connector method to create a declarative source object instead of requiring a custom Python source object.

The class also automatically locates the manifest.yaml file and the components.py file (if it exists) in the connector's directory.

connector: type[airbyte_cdk.test.standard_tests._job_runner.IConnector] | None = None

The connector class or a factory function that returns an scenario of IConnector.

def manifest_yaml_path(unknown):

Much like a property, but the wrapped get function is a class method. For simplicity, only read-only properties are implemented.

def components_py_path(unknown):

Much like a property, but the wrapped get function is a class method. For simplicity, only read-only properties are implemented.

@classmethod
def create_connector( cls, scenario: ConnectorTestScenario | None) -> airbyte_cdk.test.standard_tests._job_runner.IConnector:
 65    @classmethod
 66    def create_connector(
 67        cls,
 68        scenario: ConnectorTestScenario | None,
 69    ) -> IConnector:
 70        """Create a connector scenario for the test suite.
 71
 72        This overrides `create_connector` from the create a declarative source object
 73        instead of requiring a custom python source object.
 74
 75        Subclasses should not need to override this method.
 76        """
 77        scenario = scenario or ConnectorTestScenario()  # Use default (empty) scenario if None
 78        manifest_dict = yaml.safe_load(cls.manifest_yaml_path.read_text())
 79        config = {
 80            "__injected_manifest": manifest_dict,
 81        }
 82        config.update(
 83            scenario.get_config_dict(
 84                empty_if_missing=True,
 85                connector_root=cls.get_connector_root_dir(),
 86            ),
 87        )
 88
 89        if cls.components_py_path and cls.components_py_path.exists():
 90            os.environ["AIRBYTE_ENABLE_UNSAFE_CODE"] = "true"
 91            config["__injected_components_py"] = cls.components_py_path.read_text()
 92            config["__injected_components_py_checksums"] = {
 93                "md5": md5_checksum(cls.components_py_path),
 94            }
 95
 96        return cast(
 97            IConnector,
 98            ConcurrentDeclarativeSource(
 99                config=config,
100                catalog=None,
101                state=None,
102                source_config=manifest_dict,
103            ),
104        )

Create a connector scenario for the test suite.

This overrides create_connector from the create a declarative source object instead of requiring a custom python source object.

Subclasses should not need to override this method.

class DestinationTestSuiteBase(airbyte_cdk.test.standard_tests.ConnectorTestSuiteBase):
 8class DestinationTestSuiteBase(ConnectorTestSuiteBase):
 9    """Base class for destination test suites.
10
11    This class provides a base set of functionality for testing destination connectors, and it
12    inherits all generic connector tests from the `ConnectorTestSuiteBase` class.
13
14    TODO: As of now, this class does not add any additional functionality or tests specific to
15    destination connectors. However, it serves as a placeholder for future enhancements and
16    customizations that may be needed for destination connectors.
17    """

Base class for destination test suites.

This class provides a base set of functionality for testing destination connectors, and it inherits all generic connector tests from the ConnectorTestSuiteBase class.

TODO: As of now, this class does not add any additional functionality or tests specific to destination connectors. However, it serves as a placeholder for future enhancements and customizations that may be needed for destination connectors.

class SourceTestSuiteBase(airbyte_cdk.test.standard_tests.ConnectorTestSuiteBase):
 31class SourceTestSuiteBase(ConnectorTestSuiteBase):
 32    """Base class for source test suites.
 33
 34    This class provides a base set of functionality for testing source connectors, and it
 35    inherits all generic connector tests from the `ConnectorTestSuiteBase` class.
 36    """
 37
 38    def test_check(
 39        self,
 40        scenario: ConnectorTestScenario,
 41    ) -> None:
 42        """Run standard `check` tests on the connector.
 43
 44        Assert that the connector returns a single CONNECTION_STATUS message.
 45        This test is designed to validate the connector's ability to establish a connection
 46        and return its status with the expected message type.
 47        """
 48        result: entrypoint_wrapper.EntrypointOutput = run_test_job(
 49            self.create_connector(scenario),
 50            "check",
 51            test_scenario=scenario,
 52            connector_root=self.get_connector_root_dir(),
 53        )
 54        num_status_messages = len(result.connection_status_messages)
 55        assert num_status_messages == 1, (
 56            f"Expected exactly one CONNECTION_STATUS message. Got {num_status_messages}: \n"
 57            + "\n".join([str(m) for m in result.get_message_iterator()])
 58        )
 59
 60    def test_discover(
 61        self,
 62        scenario: ConnectorTestScenario,
 63    ) -> None:
 64        """Standard test for `discover`."""
 65        if scenario.expected_outcome.expect_exception():
 66            # If the scenario expects an exception, we can't ensure it specifically would fail
 67            # in discover, because some discover implementations do not need to make a connection.
 68            # We skip this test in that case.
 69            pytest.skip("Skipping discover test for scenario that expects an exception.")
 70            return
 71
 72        run_test_job(
 73            self.create_connector(scenario),
 74            "discover",
 75            connector_root=self.get_connector_root_dir(),
 76            test_scenario=scenario,
 77        )
 78
 79    def test_spec(self) -> None:
 80        """Standard test for `spec`.
 81
 82        This test does not require a `scenario` input, since `spec`
 83        does not require any inputs.
 84
 85        We assume `spec` should always succeed and it should always generate
 86        a valid `SPEC` message.
 87
 88        Note: the parsing of messages by type also implicitly validates that
 89        the generated `SPEC` message is valid JSON.
 90        """
 91        result = run_test_job(
 92            verb="spec",
 93            test_scenario=None,
 94            connector=self.create_connector(scenario=None),
 95            connector_root=self.get_connector_root_dir(),
 96        )
 97        # If an error occurs, it will be raised above.
 98
 99        assert len(result.spec_messages) == 1, (
100            "Expected exactly 1 spec message but got {len(result.spec_messages)}",
101            result.errors,
102        )
103
104    def test_basic_read(
105        self,
106        scenario: ConnectorTestScenario,
107    ) -> None:
108        """Run standard `read` test on the connector.
109
110        This test is designed to validate the connector's ability to read data
111        from the source and return records. It first runs a `discover` job to
112        obtain the catalog of streams, and then it runs a `read` job to fetch
113        records from those streams.
114        """
115        discover_result = run_test_job(
116            self.create_connector(scenario),
117            "discover",
118            connector_root=self.get_connector_root_dir(),
119            test_scenario=scenario.without_expected_outcome(),
120        )
121        if scenario.expected_outcome.expect_exception() and discover_result.errors:
122            # Failed as expected; we're done.
123            return
124        streams = discover_result.catalog.catalog.streams  # type: ignore [reportOptionalMemberAccess, union-attr]
125
126        if scenario.empty_streams:
127            # Filter out streams marked as empty in the scenario.
128            empty_stream_names = [stream.name for stream in scenario.empty_streams]
129            streams = [s for s in streams if s.name not in empty_stream_names]
130
131        configured_catalog = ConfiguredAirbyteCatalog(
132            streams=[
133                ConfiguredAirbyteStream(
134                    stream=stream,
135                    sync_mode=SyncMode.full_refresh,
136                    destination_sync_mode=DestinationSyncMode.append_dedup,
137                )
138                for stream in streams
139            ]
140        )
141        result = run_test_job(
142            self.create_connector(scenario),
143            "read",
144            test_scenario=scenario,
145            connector_root=self.get_connector_root_dir(),
146            catalog=configured_catalog,
147        )
148
149        if scenario.expected_outcome.expect_success() and not result.records:
150            raise AssertionError("Expected records but got none.")
151
152    def test_fail_read_with_bad_catalog(
153        self,
154        scenario: ConnectorTestScenario,
155    ) -> None:
156        """Standard test for `read` when passed a bad catalog file."""
157        invalid_configured_catalog = ConfiguredAirbyteCatalog(
158            streams=[
159                # Create ConfiguredAirbyteStream which is deliberately invalid
160                # with regard to the Airbyte Protocol.
161                # This should cause the connector to fail.
162                ConfiguredAirbyteStream(
163                    stream=AirbyteStream(
164                        name="__AIRBYTE__stream_that_does_not_exist",
165                        json_schema={
166                            "type": "object",
167                            "properties": {"f1": {"type": "string"}},
168                        },
169                        supported_sync_modes=[SyncMode.full_refresh],
170                    ),
171                    sync_mode="INVALID",  # type: ignore [reportArgumentType]
172                    destination_sync_mode="INVALID",  # type: ignore [reportArgumentType]
173                ),
174            ],
175        )
176        result: entrypoint_wrapper.EntrypointOutput = run_test_job(
177            self.create_connector(scenario),
178            "read",
179            connector_root=self.get_connector_root_dir(),
180            test_scenario=scenario.with_expecting_failure(),  # Expect failure due to bad catalog
181            catalog=asdict(invalid_configured_catalog),
182        )
183        assert result.errors, "Expected errors but got none."
184        assert result.trace_messages, "Expected trace messages but got none."

Base class for source test suites.

This class provides a base set of functionality for testing source connectors, and it inherits all generic connector tests from the ConnectorTestSuiteBase class.

def test_check( self, scenario: ConnectorTestScenario) -> None:
38    def test_check(
39        self,
40        scenario: ConnectorTestScenario,
41    ) -> None:
42        """Run standard `check` tests on the connector.
43
44        Assert that the connector returns a single CONNECTION_STATUS message.
45        This test is designed to validate the connector's ability to establish a connection
46        and return its status with the expected message type.
47        """
48        result: entrypoint_wrapper.EntrypointOutput = run_test_job(
49            self.create_connector(scenario),
50            "check",
51            test_scenario=scenario,
52            connector_root=self.get_connector_root_dir(),
53        )
54        num_status_messages = len(result.connection_status_messages)
55        assert num_status_messages == 1, (
56            f"Expected exactly one CONNECTION_STATUS message. Got {num_status_messages}: \n"
57            + "\n".join([str(m) for m in result.get_message_iterator()])
58        )

Run standard check tests on the connector.

Assert that the connector returns a single CONNECTION_STATUS message. This test is designed to validate the connector's ability to establish a connection and return its status with the expected message type.

def test_discover( self, scenario: ConnectorTestScenario) -> None:
60    def test_discover(
61        self,
62        scenario: ConnectorTestScenario,
63    ) -> None:
64        """Standard test for `discover`."""
65        if scenario.expected_outcome.expect_exception():
66            # If the scenario expects an exception, we can't ensure it specifically would fail
67            # in discover, because some discover implementations do not need to make a connection.
68            # We skip this test in that case.
69            pytest.skip("Skipping discover test for scenario that expects an exception.")
70            return
71
72        run_test_job(
73            self.create_connector(scenario),
74            "discover",
75            connector_root=self.get_connector_root_dir(),
76            test_scenario=scenario,
77        )

Standard test for discover.

def test_spec(self) -> None:
 79    def test_spec(self) -> None:
 80        """Standard test for `spec`.
 81
 82        This test does not require a `scenario` input, since `spec`
 83        does not require any inputs.
 84
 85        We assume `spec` should always succeed and it should always generate
 86        a valid `SPEC` message.
 87
 88        Note: the parsing of messages by type also implicitly validates that
 89        the generated `SPEC` message is valid JSON.
 90        """
 91        result = run_test_job(
 92            verb="spec",
 93            test_scenario=None,
 94            connector=self.create_connector(scenario=None),
 95            connector_root=self.get_connector_root_dir(),
 96        )
 97        # If an error occurs, it will be raised above.
 98
 99        assert len(result.spec_messages) == 1, (
100            "Expected exactly 1 spec message but got {len(result.spec_messages)}",
101            result.errors,
102        )

Standard test for spec.

This test does not require a scenario input, since spec does not require any inputs.

We assume spec should always succeed and it should always generate a valid SPEC message.

Note: the parsing of messages by type also implicitly validates that the generated SPEC message is valid JSON.

def test_basic_read( self, scenario: ConnectorTestScenario) -> None:
104    def test_basic_read(
105        self,
106        scenario: ConnectorTestScenario,
107    ) -> None:
108        """Run standard `read` test on the connector.
109
110        This test is designed to validate the connector's ability to read data
111        from the source and return records. It first runs a `discover` job to
112        obtain the catalog of streams, and then it runs a `read` job to fetch
113        records from those streams.
114        """
115        discover_result = run_test_job(
116            self.create_connector(scenario),
117            "discover",
118            connector_root=self.get_connector_root_dir(),
119            test_scenario=scenario.without_expected_outcome(),
120        )
121        if scenario.expected_outcome.expect_exception() and discover_result.errors:
122            # Failed as expected; we're done.
123            return
124        streams = discover_result.catalog.catalog.streams  # type: ignore [reportOptionalMemberAccess, union-attr]
125
126        if scenario.empty_streams:
127            # Filter out streams marked as empty in the scenario.
128            empty_stream_names = [stream.name for stream in scenario.empty_streams]
129            streams = [s for s in streams if s.name not in empty_stream_names]
130
131        configured_catalog = ConfiguredAirbyteCatalog(
132            streams=[
133                ConfiguredAirbyteStream(
134                    stream=stream,
135                    sync_mode=SyncMode.full_refresh,
136                    destination_sync_mode=DestinationSyncMode.append_dedup,
137                )
138                for stream in streams
139            ]
140        )
141        result = run_test_job(
142            self.create_connector(scenario),
143            "read",
144            test_scenario=scenario,
145            connector_root=self.get_connector_root_dir(),
146            catalog=configured_catalog,
147        )
148
149        if scenario.expected_outcome.expect_success() and not result.records:
150            raise AssertionError("Expected records but got none.")

Run standard read test on the connector.

This test is designed to validate the connector's ability to read data from the source and return records. It first runs a discover job to obtain the catalog of streams, and then it runs a read job to fetch records from those streams.

def test_fail_read_with_bad_catalog( self, scenario: ConnectorTestScenario) -> None:
152    def test_fail_read_with_bad_catalog(
153        self,
154        scenario: ConnectorTestScenario,
155    ) -> None:
156        """Standard test for `read` when passed a bad catalog file."""
157        invalid_configured_catalog = ConfiguredAirbyteCatalog(
158            streams=[
159                # Create ConfiguredAirbyteStream which is deliberately invalid
160                # with regard to the Airbyte Protocol.
161                # This should cause the connector to fail.
162                ConfiguredAirbyteStream(
163                    stream=AirbyteStream(
164                        name="__AIRBYTE__stream_that_does_not_exist",
165                        json_schema={
166                            "type": "object",
167                            "properties": {"f1": {"type": "string"}},
168                        },
169                        supported_sync_modes=[SyncMode.full_refresh],
170                    ),
171                    sync_mode="INVALID",  # type: ignore [reportArgumentType]
172                    destination_sync_mode="INVALID",  # type: ignore [reportArgumentType]
173                ),
174            ],
175        )
176        result: entrypoint_wrapper.EntrypointOutput = run_test_job(
177            self.create_connector(scenario),
178            "read",
179            connector_root=self.get_connector_root_dir(),
180            test_scenario=scenario.with_expecting_failure(),  # Expect failure due to bad catalog
181            catalog=asdict(invalid_configured_catalog),
182        )
183        assert result.errors, "Expected errors but got none."
184        assert result.trace_messages, "Expected trace messages but got none."

Standard test for read when passed a bad catalog file.