airbyte_cdk.test.standard_tests
FAST Airbyte Standard Tests
This module provides a set of base classes for declarative connector test suites. The goal of this module is to provide a robust and extensible framework for testing Airbyte connectors.
Example usage:
# `test_airbyte_standards.py`
from airbyte_cdk.test import standard_tests
pytest_plugins = [
"airbyte_cdk.test.standard_tests.pytest_hooks",
]
class TestSuiteSourcePokeAPI(standard_tests.DeclarativeSourceTestSuite):
"""Test suite for the source."""
Available test suites base classes:
DeclarativeSourceTestSuite
: A test suite for declarative sources.SourceTestSuiteBase
: A test suite for sources.DestinationTestSuiteBase
: A test suite for destinations.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2'''FAST Airbyte Standard Tests 3 4This module provides a set of base classes for declarative connector test suites. 5The goal of this module is to provide a robust and extensible framework for testing Airbyte 6connectors. 7 8Example usage: 9 10```python 11# `test_airbyte_standards.py` 12from airbyte_cdk.test import standard_tests 13 14pytest_plugins = [ 15 "airbyte_cdk.test.standard_tests.pytest_hooks", 16] 17 18 19class TestSuiteSourcePokeAPI(standard_tests.DeclarativeSourceTestSuite): 20 """Test suite for the source.""" 21``` 22 23Available test suites base classes: 24- `DeclarativeSourceTestSuite`: A test suite for declarative sources. 25- `SourceTestSuiteBase`: A test suite for sources. 26- `DestinationTestSuiteBase`: A test suite for destinations. 27 28''' 29 30from airbyte_cdk.test.models.scenario import ConnectorTestScenario 31from airbyte_cdk.test.standard_tests.connector_base import ConnectorTestSuiteBase 32from airbyte_cdk.test.standard_tests.declarative_sources import ( 33 DeclarativeSourceTestSuite, 34) 35from airbyte_cdk.test.standard_tests.destination_base import DestinationTestSuiteBase 36from airbyte_cdk.test.standard_tests.source_base import SourceTestSuiteBase 37 38__all__ = [ 39 "ConnectorTestScenario", 40 "ConnectorTestSuiteBase", 41 "DeclarativeSourceTestSuite", 42 "DestinationTestSuiteBase", 43 "SourceTestSuiteBase", 44]
28class ConnectorTestScenario(BaseModel): 29 """Acceptance test scenario, as a Pydantic model. 30 31 This class represents an acceptance test scenario, which is a single test case 32 that can be run against a connector. It is used to deserialize and validate the 33 acceptance test configuration file. 34 """ 35 36 # Allows the class to be hashable, which PyTest will require 37 # when we use to parameterize tests. 38 model_config = ConfigDict(frozen=True) 39 40 class AcceptanceTestExpectRecords(BaseModel): 41 path: Path 42 exact_order: bool = False 43 44 class AcceptanceTestFileTypes(BaseModel): 45 skip_test: bool 46 bypass_reason: str 47 48 class AcceptanceTestEmptyStream(BaseModel): 49 name: str 50 bypass_reason: str | None = None 51 52 # bypass reason does not affect equality 53 def __hash__(self) -> int: 54 return hash(self.name) 55 56 config_path: Path | None = None 57 config_dict: dict[str, Any] | None = None 58 59 _id: str | None = None # Used to override the default ID generation 60 61 configured_catalog_path: Path | None = None 62 empty_streams: list[AcceptanceTestEmptyStream] | None = None 63 timeout_seconds: int | None = None 64 expect_records: AcceptanceTestExpectRecords | None = None 65 file_types: AcceptanceTestFileTypes | None = None 66 status: Literal["succeed", "failed", "exception"] | None = None 67 68 def get_config_dict( 69 self, 70 *, 71 connector_root: Path, 72 empty_if_missing: bool, 73 ) -> dict[str, Any]: 74 """Return the config dictionary. 75 76 If a config dictionary has already been loaded, return it. Otherwise, load 77 the config file and return the dictionary. 78 79 If `self.config_dict` and `self.config_path` are both `None`: 80 - return an empty dictionary if `empty_if_missing` is True 81 - raise a ValueError if `empty_if_missing` is False 82 """ 83 if self.config_dict is not None: 84 return self.config_dict 85 86 if self.config_path is not None: 87 config_path = self.config_path 88 if not config_path.is_absolute(): 89 # We usually receive a relative path here. Let's resolve it. 90 config_path = (connector_root / self.config_path).resolve().absolute() 91 92 return cast( 93 dict[str, Any], 94 yaml.safe_load(config_path.read_text()), 95 ) 96 97 if empty_if_missing: 98 return {} 99 100 raise ValueError("No config dictionary or path provided.") 101 102 @property 103 def expected_outcome(self) -> ExpectedOutcome: 104 """Whether the test scenario expects an exception to be raised. 105 106 Returns True if the scenario expects an exception, False if it does not, 107 and None if there is no set expectation. 108 """ 109 return ExpectedOutcome.from_status_str(self.status) 110 111 @property 112 def id(self) -> str: 113 """Return a unique identifier for the test scenario. 114 115 This is used by PyTest to identify the test scenario. 116 """ 117 if self._id: 118 return self._id 119 120 if self.config_path: 121 return self.config_path.stem 122 123 return str(hash(self)) 124 125 def __str__(self) -> str: 126 return f"'{self.id}' Test Scenario" 127 128 @contextmanager 129 def with_temp_config_file( 130 self, 131 connector_root: Path, 132 ) -> Generator[Path, None, None]: 133 """Yield a temporary JSON file path containing the config dict and delete it on exit.""" 134 config = self.get_config_dict( 135 empty_if_missing=True, 136 connector_root=connector_root, 137 ) 138 with tempfile.NamedTemporaryFile( 139 prefix="config-", 140 suffix=".json", 141 mode="w", 142 delete=False, # Don't fail if cannot delete the file on exit 143 encoding="utf-8", 144 ) as temp_file: 145 temp_file.write(json.dumps(config)) 146 temp_file.flush() 147 # Allow the file to be read by other processes 148 temp_path = Path(temp_file.name) 149 temp_path.chmod(temp_path.stat().st_mode | 0o444) 150 yield temp_path 151 152 # attempt cleanup, ignore errors 153 with suppress(OSError): 154 temp_path.unlink() 155 156 def without_expected_outcome(self) -> ConnectorTestScenario: 157 """Return a copy of the scenario that does not expect failure or success. 158 159 This is useful when running multiple steps, to defer the expectations to a later step. 160 """ 161 return ConnectorTestScenario( 162 **self.model_dump(exclude={"status"}), 163 ) 164 165 def with_expecting_failure(self) -> ConnectorTestScenario: 166 """Return a copy of the scenario that expects failure. 167 168 This is useful when deriving new scenarios from existing ones. 169 """ 170 if self.status == "failed": 171 return self 172 173 return ConnectorTestScenario( 174 **self.model_dump(exclude={"status"}), 175 status="failed", 176 ) 177 178 def with_expecting_success(self) -> ConnectorTestScenario: 179 """Return a copy of the scenario that expects success. 180 181 This is useful when deriving new scenarios from existing ones. 182 """ 183 if self.status == "succeed": 184 return self 185 186 return ConnectorTestScenario( 187 **self.model_dump(exclude={"status"}), 188 status="succeed", 189 ) 190 191 @property 192 def requires_creds(self) -> bool: 193 """Return True if the scenario requires credentials to run.""" 194 return bool(self.config_path and "secrets" in self.config_path.parts)
Acceptance test scenario, as a Pydantic model.
This class represents an acceptance test scenario, which is a single test case that can be run against a connector. It is used to deserialize and validate the acceptance test configuration file.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
68 def get_config_dict( 69 self, 70 *, 71 connector_root: Path, 72 empty_if_missing: bool, 73 ) -> dict[str, Any]: 74 """Return the config dictionary. 75 76 If a config dictionary has already been loaded, return it. Otherwise, load 77 the config file and return the dictionary. 78 79 If `self.config_dict` and `self.config_path` are both `None`: 80 - return an empty dictionary if `empty_if_missing` is True 81 - raise a ValueError if `empty_if_missing` is False 82 """ 83 if self.config_dict is not None: 84 return self.config_dict 85 86 if self.config_path is not None: 87 config_path = self.config_path 88 if not config_path.is_absolute(): 89 # We usually receive a relative path here. Let's resolve it. 90 config_path = (connector_root / self.config_path).resolve().absolute() 91 92 return cast( 93 dict[str, Any], 94 yaml.safe_load(config_path.read_text()), 95 ) 96 97 if empty_if_missing: 98 return {} 99 100 raise ValueError("No config dictionary or path provided.")
Return the config dictionary.
If a config dictionary has already been loaded, return it. Otherwise, load the config file and return the dictionary.
If self.config_dict
and self.config_path
are both None
:
- return an empty dictionary if
empty_if_missing
is True - raise a ValueError if
empty_if_missing
is False
102 @property 103 def expected_outcome(self) -> ExpectedOutcome: 104 """Whether the test scenario expects an exception to be raised. 105 106 Returns True if the scenario expects an exception, False if it does not, 107 and None if there is no set expectation. 108 """ 109 return ExpectedOutcome.from_status_str(self.status)
Whether the test scenario expects an exception to be raised.
Returns True if the scenario expects an exception, False if it does not, and None if there is no set expectation.
111 @property 112 def id(self) -> str: 113 """Return a unique identifier for the test scenario. 114 115 This is used by PyTest to identify the test scenario. 116 """ 117 if self._id: 118 return self._id 119 120 if self.config_path: 121 return self.config_path.stem 122 123 return str(hash(self))
Return a unique identifier for the test scenario.
This is used by PyTest to identify the test scenario.
128 @contextmanager 129 def with_temp_config_file( 130 self, 131 connector_root: Path, 132 ) -> Generator[Path, None, None]: 133 """Yield a temporary JSON file path containing the config dict and delete it on exit.""" 134 config = self.get_config_dict( 135 empty_if_missing=True, 136 connector_root=connector_root, 137 ) 138 with tempfile.NamedTemporaryFile( 139 prefix="config-", 140 suffix=".json", 141 mode="w", 142 delete=False, # Don't fail if cannot delete the file on exit 143 encoding="utf-8", 144 ) as temp_file: 145 temp_file.write(json.dumps(config)) 146 temp_file.flush() 147 # Allow the file to be read by other processes 148 temp_path = Path(temp_file.name) 149 temp_path.chmod(temp_path.stat().st_mode | 0o444) 150 yield temp_path 151 152 # attempt cleanup, ignore errors 153 with suppress(OSError): 154 temp_path.unlink()
Yield a temporary JSON file path containing the config dict and delete it on exit.
156 def without_expected_outcome(self) -> ConnectorTestScenario: 157 """Return a copy of the scenario that does not expect failure or success. 158 159 This is useful when running multiple steps, to defer the expectations to a later step. 160 """ 161 return ConnectorTestScenario( 162 **self.model_dump(exclude={"status"}), 163 )
Return a copy of the scenario that does not expect failure or success.
This is useful when running multiple steps, to defer the expectations to a later step.
165 def with_expecting_failure(self) -> ConnectorTestScenario: 166 """Return a copy of the scenario that expects failure. 167 168 This is useful when deriving new scenarios from existing ones. 169 """ 170 if self.status == "failed": 171 return self 172 173 return ConnectorTestScenario( 174 **self.model_dump(exclude={"status"}), 175 status="failed", 176 )
Return a copy of the scenario that expects failure.
This is useful when deriving new scenarios from existing ones.
178 def with_expecting_success(self) -> ConnectorTestScenario: 179 """Return a copy of the scenario that expects success. 180 181 This is useful when deriving new scenarios from existing ones. 182 """ 183 if self.status == "succeed": 184 return self 185 186 return ConnectorTestScenario( 187 **self.model_dump(exclude={"status"}), 188 status="succeed", 189 )
Return a copy of the scenario that expects success.
This is useful when deriving new scenarios from existing ones.
191 @property 192 def requires_creds(self) -> bool: 193 """Return True if the scenario requires credentials to run.""" 194 return bool(self.config_path and "secrets" in self.config_path.parts)
Return True if the scenario requires credentials to run.
328def init_private_attributes(self: BaseModel, context: Any, /) -> None: 329 """This function is meant to behave like a BaseModel method to initialise private attributes. 330 331 It takes context as an argument since that's what pydantic-core passes when calling it. 332 333 Args: 334 self: The BaseModel instance. 335 context: The context. 336 """ 337 if getattr(self, '__pydantic_private__', None) is None: 338 pydantic_private = {} 339 for name, private_attr in self.__private_attributes__.items(): 340 default = private_attr.get_default() 341 if default is not PydanticUndefined: 342 pydantic_private[name] = default 343 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Usage docs: https://docs.pydantic.dev/2.10/concepts/models/
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__
[Signature
][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__
function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__
andModel.__root_validators__
from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel
][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-core
SchemaSerializer
used to dump instances of the model. - __pydantic_validator__: The
pydantic-core
SchemaValidator
used to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo
][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo
][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra
][pydantic.config.ConfigDict.extra] is set to'allow'
. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
Usage docs: https://docs.pydantic.dev/2.10/concepts/models/
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__
[Signature
][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__
function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__
andModel.__root_validators__
from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel
][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-core
SchemaSerializer
used to dump instances of the model. - __pydantic_validator__: The
pydantic-core
SchemaValidator
used to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo
][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo
][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra
][pydantic.config.ConfigDict.extra] is set to'allow'
. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
48 class AcceptanceTestEmptyStream(BaseModel): 49 name: str 50 bypass_reason: str | None = None 51 52 # bypass reason does not affect equality 53 def __hash__(self) -> int: 54 return hash(self.name)
Usage docs: https://docs.pydantic.dev/2.10/concepts/models/
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__
[Signature
][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__
function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__
andModel.__root_validators__
from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel
][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-core
SchemaSerializer
used to dump instances of the model. - __pydantic_validator__: The
pydantic-core
SchemaValidator
used to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo
][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo
][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra
][pydantic.config.ConfigDict.extra] is set to'allow'
. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
27class ConnectorTestSuiteBase(DockerConnectorTestSuite): 28 """Base class for Python connector test suites.""" 29 30 connector: type[IConnector] | Callable[[], IConnector] | None # type: ignore [reportRedeclaration] 31 """The connector class or a factory function that returns an scenario of IConnector.""" 32 33 @classproperty # type: ignore [no-redef] 34 def connector(cls) -> type[IConnector] | Callable[[], IConnector] | None: 35 """Get the connector class for the test suite. 36 37 This assumes a python connector and should be overridden by subclasses to provide the 38 specific connector class to be tested. 39 """ 40 connector_root = cls.get_connector_root_dir() 41 connector_name = cls.connector_name 42 43 expected_module_name = connector_name.replace("-", "_").lower() 44 expected_class_name = connector_name.replace("-", "_").title().replace("_", "") 45 46 # dynamically import and get the connector class: <expected_module_name>.<expected_class_name> 47 48 cwd_snapshot = Path().absolute() 49 os.chdir(connector_root) 50 51 # Dynamically import the module 52 try: 53 module = importlib.import_module(expected_module_name) 54 except ModuleNotFoundError as e: 55 raise ImportError( 56 f"Could not import module '{expected_module_name}'. " 57 "Please ensure you are running from within the connector's virtual environment, " 58 "for instance by running `poetry run airbyte-cdk connector test` from the " 59 "connector directory. If the issue persists, check that the connector " 60 f"module matches the expected module name '{expected_module_name}' and that the " 61 f"connector class matches the expected class name '{expected_class_name}'. " 62 "Alternatively, you can run `airbyte-cdk image test` to run a subset of tests " 63 "against the connector's image." 64 ) from e 65 finally: 66 # Change back to the original working directory 67 os.chdir(cwd_snapshot) 68 69 # Dynamically get the class from the module 70 try: 71 return cast(type[IConnector], getattr(module, expected_class_name)) 72 except AttributeError as e: 73 # We did not find it based on our expectations, so let's check if we can find it 74 # with a case-insensitive match. 75 matching_class_name = next( 76 (name for name in dir(module) if name.lower() == expected_class_name.lower()), 77 None, 78 ) 79 if not matching_class_name: 80 raise ImportError( 81 f"Module '{expected_module_name}' does not have a class named '{expected_class_name}'." 82 ) from e 83 return cast(type[IConnector], getattr(module, matching_class_name)) 84 85 @classmethod 86 def create_connector( 87 cls, 88 scenario: ConnectorTestScenario | None, 89 ) -> IConnector: 90 """Instantiate the connector class.""" 91 connector = cls.connector # type: ignore 92 if connector: 93 if callable(connector) or isinstance(connector, type): 94 # If the connector is a class or factory function, instantiate it: 95 return cast(IConnector, connector()) # type: ignore [redundant-cast] 96 97 # Otherwise, we can't instantiate the connector. Fail with a clear error message. 98 raise NotImplementedError( 99 "No connector class or connector factory function provided. " 100 "Please provide a class or factory function in `cls.connector`, or " 101 "override `cls.create_connector()` to define a custom initialization process." 102 ) 103 104 # Test Definitions 105 106 def test_check( 107 self, 108 scenario: ConnectorTestScenario, 109 ) -> None: 110 """Run `connection` acceptance tests.""" 111 result: entrypoint_wrapper.EntrypointOutput = run_test_job( 112 self.create_connector(scenario), 113 "check", 114 test_scenario=scenario, 115 connector_root=self.get_connector_root_dir(), 116 ) 117 assert len(result.connection_status_messages) == 1, ( 118 f"Expected exactly one CONNECTION_STATUS message. " 119 "Got: {result.connection_status_messages!s}" 120 )
Base class for Python connector test suites.
The connector class or a factory function that returns an scenario of IConnector.
85 @classmethod 86 def create_connector( 87 cls, 88 scenario: ConnectorTestScenario | None, 89 ) -> IConnector: 90 """Instantiate the connector class.""" 91 connector = cls.connector # type: ignore 92 if connector: 93 if callable(connector) or isinstance(connector, type): 94 # If the connector is a class or factory function, instantiate it: 95 return cast(IConnector, connector()) # type: ignore [redundant-cast] 96 97 # Otherwise, we can't instantiate the connector. Fail with a clear error message. 98 raise NotImplementedError( 99 "No connector class or connector factory function provided. " 100 "Please provide a class or factory function in `cls.connector`, or " 101 "override `cls.create_connector()` to define a custom initialization process." 102 )
Instantiate the connector class.
106 def test_check( 107 self, 108 scenario: ConnectorTestScenario, 109 ) -> None: 110 """Run `connection` acceptance tests.""" 111 result: entrypoint_wrapper.EntrypointOutput = run_test_job( 112 self.create_connector(scenario), 113 "check", 114 test_scenario=scenario, 115 connector_root=self.get_connector_root_dir(), 116 ) 117 assert len(result.connection_status_messages) == 1, ( 118 f"Expected exactly one CONNECTION_STATUS message. " 119 "Got: {result.connection_status_messages!s}" 120 )
Run connection
acceptance tests.
Inherited Members
28class DeclarativeSourceTestSuite(SourceTestSuiteBase): 29 """Declarative source test suite. 30 31 This inherits from the Python-based source test suite and implements the 32 `create_connector` method to create a declarative source object instead of 33 requiring a custom Python source object. 34 35 The class also automatically locates the `manifest.yaml` file and the 36 `components.py` file (if it exists) in the connector's directory. 37 """ 38 39 connector: type[IConnector] | None = None 40 41 @classproperty 42 def manifest_yaml_path(cls) -> Path: 43 """Get the path to the manifest.yaml file.""" 44 result = cls.get_connector_root_dir() / MANIFEST_YAML 45 if result.exists(): 46 return result 47 48 raise FileNotFoundError( 49 f"Manifest YAML file not found at {result}. " 50 "Please ensure that the test suite is run in the correct directory.", 51 ) 52 53 @classproperty 54 def components_py_path(cls) -> Path | None: 55 """Get the path to the `components.py` file, if one exists. 56 57 If not `components.py` file exists, return None. 58 """ 59 result = cls.get_connector_root_dir() / "components.py" 60 if result.exists(): 61 return result 62 63 return None 64 65 @classmethod 66 def create_connector( 67 cls, 68 scenario: ConnectorTestScenario | None, 69 ) -> IConnector: 70 """Create a connector scenario for the test suite. 71 72 This overrides `create_connector` from the create a declarative source object 73 instead of requiring a custom python source object. 74 75 Subclasses should not need to override this method. 76 """ 77 scenario = scenario or ConnectorTestScenario() # Use default (empty) scenario if None 78 manifest_dict = yaml.safe_load(cls.manifest_yaml_path.read_text()) 79 config = { 80 "__injected_manifest": manifest_dict, 81 } 82 config.update( 83 scenario.get_config_dict( 84 empty_if_missing=True, 85 connector_root=cls.get_connector_root_dir(), 86 ), 87 ) 88 89 if cls.components_py_path and cls.components_py_path.exists(): 90 os.environ["AIRBYTE_ENABLE_UNSAFE_CODE"] = "true" 91 config["__injected_components_py"] = cls.components_py_path.read_text() 92 config["__injected_components_py_checksums"] = { 93 "md5": md5_checksum(cls.components_py_path), 94 } 95 96 return cast( 97 IConnector, 98 ConcurrentDeclarativeSource( 99 config=config, 100 catalog=None, 101 state=None, 102 source_config=manifest_dict, 103 ), 104 )
Declarative source test suite.
This inherits from the Python-based source test suite and implements the
create_connector
method to create a declarative source object instead of
requiring a custom Python source object.
The class also automatically locates the manifest.yaml
file and the
components.py
file (if it exists) in the connector's directory.
The connector class or a factory function that returns an scenario of IConnector.
Much like a property
, but the wrapped get function is a
class method. For simplicity, only read-only properties are
implemented.
Much like a property
, but the wrapped get function is a
class method. For simplicity, only read-only properties are
implemented.
65 @classmethod 66 def create_connector( 67 cls, 68 scenario: ConnectorTestScenario | None, 69 ) -> IConnector: 70 """Create a connector scenario for the test suite. 71 72 This overrides `create_connector` from the create a declarative source object 73 instead of requiring a custom python source object. 74 75 Subclasses should not need to override this method. 76 """ 77 scenario = scenario or ConnectorTestScenario() # Use default (empty) scenario if None 78 manifest_dict = yaml.safe_load(cls.manifest_yaml_path.read_text()) 79 config = { 80 "__injected_manifest": manifest_dict, 81 } 82 config.update( 83 scenario.get_config_dict( 84 empty_if_missing=True, 85 connector_root=cls.get_connector_root_dir(), 86 ), 87 ) 88 89 if cls.components_py_path and cls.components_py_path.exists(): 90 os.environ["AIRBYTE_ENABLE_UNSAFE_CODE"] = "true" 91 config["__injected_components_py"] = cls.components_py_path.read_text() 92 config["__injected_components_py_checksums"] = { 93 "md5": md5_checksum(cls.components_py_path), 94 } 95 96 return cast( 97 IConnector, 98 ConcurrentDeclarativeSource( 99 config=config, 100 catalog=None, 101 state=None, 102 source_config=manifest_dict, 103 ), 104 )
Create a connector scenario for the test suite.
This overrides create_connector
from the create a declarative source object
instead of requiring a custom python source object.
Subclasses should not need to override this method.
Inherited Members
8class DestinationTestSuiteBase(ConnectorTestSuiteBase): 9 """Base class for destination test suites. 10 11 This class provides a base set of functionality for testing destination connectors, and it 12 inherits all generic connector tests from the `ConnectorTestSuiteBase` class. 13 14 TODO: As of now, this class does not add any additional functionality or tests specific to 15 destination connectors. However, it serves as a placeholder for future enhancements and 16 customizations that may be needed for destination connectors. 17 """
Base class for destination test suites.
This class provides a base set of functionality for testing destination connectors, and it
inherits all generic connector tests from the ConnectorTestSuiteBase
class.
TODO: As of now, this class does not add any additional functionality or tests specific to destination connectors. However, it serves as a placeholder for future enhancements and customizations that may be needed for destination connectors.
Inherited Members
31class SourceTestSuiteBase(ConnectorTestSuiteBase): 32 """Base class for source test suites. 33 34 This class provides a base set of functionality for testing source connectors, and it 35 inherits all generic connector tests from the `ConnectorTestSuiteBase` class. 36 """ 37 38 def test_check( 39 self, 40 scenario: ConnectorTestScenario, 41 ) -> None: 42 """Run standard `check` tests on the connector. 43 44 Assert that the connector returns a single CONNECTION_STATUS message. 45 This test is designed to validate the connector's ability to establish a connection 46 and return its status with the expected message type. 47 """ 48 result: entrypoint_wrapper.EntrypointOutput = run_test_job( 49 self.create_connector(scenario), 50 "check", 51 test_scenario=scenario, 52 connector_root=self.get_connector_root_dir(), 53 ) 54 num_status_messages = len(result.connection_status_messages) 55 assert num_status_messages == 1, ( 56 f"Expected exactly one CONNECTION_STATUS message. Got {num_status_messages}: \n" 57 + "\n".join([str(m) for m in result.get_message_iterator()]) 58 ) 59 60 def test_discover( 61 self, 62 scenario: ConnectorTestScenario, 63 ) -> None: 64 """Standard test for `discover`.""" 65 if scenario.expected_outcome.expect_exception(): 66 # If the scenario expects an exception, we can't ensure it specifically would fail 67 # in discover, because some discover implementations do not need to make a connection. 68 # We skip this test in that case. 69 pytest.skip("Skipping discover test for scenario that expects an exception.") 70 return 71 72 run_test_job( 73 self.create_connector(scenario), 74 "discover", 75 connector_root=self.get_connector_root_dir(), 76 test_scenario=scenario, 77 ) 78 79 def test_spec(self) -> None: 80 """Standard test for `spec`. 81 82 This test does not require a `scenario` input, since `spec` 83 does not require any inputs. 84 85 We assume `spec` should always succeed and it should always generate 86 a valid `SPEC` message. 87 88 Note: the parsing of messages by type also implicitly validates that 89 the generated `SPEC` message is valid JSON. 90 """ 91 result = run_test_job( 92 verb="spec", 93 test_scenario=None, 94 connector=self.create_connector(scenario=None), 95 connector_root=self.get_connector_root_dir(), 96 ) 97 # If an error occurs, it will be raised above. 98 99 assert len(result.spec_messages) == 1, ( 100 "Expected exactly 1 spec message but got {len(result.spec_messages)}", 101 result.errors, 102 ) 103 104 def test_basic_read( 105 self, 106 scenario: ConnectorTestScenario, 107 ) -> None: 108 """Run standard `read` test on the connector. 109 110 This test is designed to validate the connector's ability to read data 111 from the source and return records. It first runs a `discover` job to 112 obtain the catalog of streams, and then it runs a `read` job to fetch 113 records from those streams. 114 """ 115 discover_result = run_test_job( 116 self.create_connector(scenario), 117 "discover", 118 connector_root=self.get_connector_root_dir(), 119 test_scenario=scenario.without_expected_outcome(), 120 ) 121 if scenario.expected_outcome.expect_exception() and discover_result.errors: 122 # Failed as expected; we're done. 123 return 124 streams = discover_result.catalog.catalog.streams # type: ignore [reportOptionalMemberAccess, union-attr] 125 126 if scenario.empty_streams: 127 # Filter out streams marked as empty in the scenario. 128 empty_stream_names = [stream.name for stream in scenario.empty_streams] 129 streams = [s for s in streams if s.name not in empty_stream_names] 130 131 configured_catalog = ConfiguredAirbyteCatalog( 132 streams=[ 133 ConfiguredAirbyteStream( 134 stream=stream, 135 sync_mode=SyncMode.full_refresh, 136 destination_sync_mode=DestinationSyncMode.append_dedup, 137 ) 138 for stream in streams 139 ] 140 ) 141 result = run_test_job( 142 self.create_connector(scenario), 143 "read", 144 test_scenario=scenario, 145 connector_root=self.get_connector_root_dir(), 146 catalog=configured_catalog, 147 ) 148 149 if scenario.expected_outcome.expect_success() and not result.records: 150 raise AssertionError("Expected records but got none.") 151 152 def test_fail_read_with_bad_catalog( 153 self, 154 scenario: ConnectorTestScenario, 155 ) -> None: 156 """Standard test for `read` when passed a bad catalog file.""" 157 invalid_configured_catalog = ConfiguredAirbyteCatalog( 158 streams=[ 159 # Create ConfiguredAirbyteStream which is deliberately invalid 160 # with regard to the Airbyte Protocol. 161 # This should cause the connector to fail. 162 ConfiguredAirbyteStream( 163 stream=AirbyteStream( 164 name="__AIRBYTE__stream_that_does_not_exist", 165 json_schema={ 166 "type": "object", 167 "properties": {"f1": {"type": "string"}}, 168 }, 169 supported_sync_modes=[SyncMode.full_refresh], 170 ), 171 sync_mode="INVALID", # type: ignore [reportArgumentType] 172 destination_sync_mode="INVALID", # type: ignore [reportArgumentType] 173 ), 174 ], 175 ) 176 result: entrypoint_wrapper.EntrypointOutput = run_test_job( 177 self.create_connector(scenario), 178 "read", 179 connector_root=self.get_connector_root_dir(), 180 test_scenario=scenario.with_expecting_failure(), # Expect failure due to bad catalog 181 catalog=asdict(invalid_configured_catalog), 182 ) 183 assert result.errors, "Expected errors but got none." 184 assert result.trace_messages, "Expected trace messages but got none."
Base class for source test suites.
This class provides a base set of functionality for testing source connectors, and it
inherits all generic connector tests from the ConnectorTestSuiteBase
class.
38 def test_check( 39 self, 40 scenario: ConnectorTestScenario, 41 ) -> None: 42 """Run standard `check` tests on the connector. 43 44 Assert that the connector returns a single CONNECTION_STATUS message. 45 This test is designed to validate the connector's ability to establish a connection 46 and return its status with the expected message type. 47 """ 48 result: entrypoint_wrapper.EntrypointOutput = run_test_job( 49 self.create_connector(scenario), 50 "check", 51 test_scenario=scenario, 52 connector_root=self.get_connector_root_dir(), 53 ) 54 num_status_messages = len(result.connection_status_messages) 55 assert num_status_messages == 1, ( 56 f"Expected exactly one CONNECTION_STATUS message. Got {num_status_messages}: \n" 57 + "\n".join([str(m) for m in result.get_message_iterator()]) 58 )
Run standard check
tests on the connector.
Assert that the connector returns a single CONNECTION_STATUS message. This test is designed to validate the connector's ability to establish a connection and return its status with the expected message type.
60 def test_discover( 61 self, 62 scenario: ConnectorTestScenario, 63 ) -> None: 64 """Standard test for `discover`.""" 65 if scenario.expected_outcome.expect_exception(): 66 # If the scenario expects an exception, we can't ensure it specifically would fail 67 # in discover, because some discover implementations do not need to make a connection. 68 # We skip this test in that case. 69 pytest.skip("Skipping discover test for scenario that expects an exception.") 70 return 71 72 run_test_job( 73 self.create_connector(scenario), 74 "discover", 75 connector_root=self.get_connector_root_dir(), 76 test_scenario=scenario, 77 )
Standard test for discover
.
79 def test_spec(self) -> None: 80 """Standard test for `spec`. 81 82 This test does not require a `scenario` input, since `spec` 83 does not require any inputs. 84 85 We assume `spec` should always succeed and it should always generate 86 a valid `SPEC` message. 87 88 Note: the parsing of messages by type also implicitly validates that 89 the generated `SPEC` message is valid JSON. 90 """ 91 result = run_test_job( 92 verb="spec", 93 test_scenario=None, 94 connector=self.create_connector(scenario=None), 95 connector_root=self.get_connector_root_dir(), 96 ) 97 # If an error occurs, it will be raised above. 98 99 assert len(result.spec_messages) == 1, ( 100 "Expected exactly 1 spec message but got {len(result.spec_messages)}", 101 result.errors, 102 )
Standard test for spec
.
This test does not require a scenario
input, since spec
does not require any inputs.
We assume spec
should always succeed and it should always generate
a valid SPEC
message.
Note: the parsing of messages by type also implicitly validates that
the generated SPEC
message is valid JSON.
104 def test_basic_read( 105 self, 106 scenario: ConnectorTestScenario, 107 ) -> None: 108 """Run standard `read` test on the connector. 109 110 This test is designed to validate the connector's ability to read data 111 from the source and return records. It first runs a `discover` job to 112 obtain the catalog of streams, and then it runs a `read` job to fetch 113 records from those streams. 114 """ 115 discover_result = run_test_job( 116 self.create_connector(scenario), 117 "discover", 118 connector_root=self.get_connector_root_dir(), 119 test_scenario=scenario.without_expected_outcome(), 120 ) 121 if scenario.expected_outcome.expect_exception() and discover_result.errors: 122 # Failed as expected; we're done. 123 return 124 streams = discover_result.catalog.catalog.streams # type: ignore [reportOptionalMemberAccess, union-attr] 125 126 if scenario.empty_streams: 127 # Filter out streams marked as empty in the scenario. 128 empty_stream_names = [stream.name for stream in scenario.empty_streams] 129 streams = [s for s in streams if s.name not in empty_stream_names] 130 131 configured_catalog = ConfiguredAirbyteCatalog( 132 streams=[ 133 ConfiguredAirbyteStream( 134 stream=stream, 135 sync_mode=SyncMode.full_refresh, 136 destination_sync_mode=DestinationSyncMode.append_dedup, 137 ) 138 for stream in streams 139 ] 140 ) 141 result = run_test_job( 142 self.create_connector(scenario), 143 "read", 144 test_scenario=scenario, 145 connector_root=self.get_connector_root_dir(), 146 catalog=configured_catalog, 147 ) 148 149 if scenario.expected_outcome.expect_success() and not result.records: 150 raise AssertionError("Expected records but got none.")
Run standard read
test on the connector.
This test is designed to validate the connector's ability to read data
from the source and return records. It first runs a discover
job to
obtain the catalog of streams, and then it runs a read
job to fetch
records from those streams.
152 def test_fail_read_with_bad_catalog( 153 self, 154 scenario: ConnectorTestScenario, 155 ) -> None: 156 """Standard test for `read` when passed a bad catalog file.""" 157 invalid_configured_catalog = ConfiguredAirbyteCatalog( 158 streams=[ 159 # Create ConfiguredAirbyteStream which is deliberately invalid 160 # with regard to the Airbyte Protocol. 161 # This should cause the connector to fail. 162 ConfiguredAirbyteStream( 163 stream=AirbyteStream( 164 name="__AIRBYTE__stream_that_does_not_exist", 165 json_schema={ 166 "type": "object", 167 "properties": {"f1": {"type": "string"}}, 168 }, 169 supported_sync_modes=[SyncMode.full_refresh], 170 ), 171 sync_mode="INVALID", # type: ignore [reportArgumentType] 172 destination_sync_mode="INVALID", # type: ignore [reportArgumentType] 173 ), 174 ], 175 ) 176 result: entrypoint_wrapper.EntrypointOutput = run_test_job( 177 self.create_connector(scenario), 178 "read", 179 connector_root=self.get_connector_root_dir(), 180 test_scenario=scenario.with_expecting_failure(), # Expect failure due to bad catalog 181 catalog=asdict(invalid_configured_catalog), 182 ) 183 assert result.errors, "Expected errors but got none." 184 assert result.trace_messages, "Expected trace messages but got none."
Standard test for read
when passed a bad catalog file.