airbyte_cdk.test.standard_tests.docker_base

Base class for connector test suites.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Base class for connector test suites."""
  3
  4from __future__ import annotations
  5
  6import inspect
  7import shutil
  8import sys
  9import tempfile
 10import warnings
 11from dataclasses import asdict
 12from pathlib import Path
 13from typing import Any, Literal, cast
 14
 15import orjson
 16import pytest
 17import yaml
 18from boltons.typeutils import classproperty
 19
 20from airbyte_cdk.models import (
 21    AirbyteCatalog,
 22    ConfiguredAirbyteCatalog,
 23    ConfiguredAirbyteStream,
 24    DestinationSyncMode,
 25    SyncMode,
 26)
 27from airbyte_cdk.models.connector_metadata import MetadataFile
 28from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
 29from airbyte_cdk.test.models import ConnectorTestScenario
 30from airbyte_cdk.utils.connector_paths import (
 31    ACCEPTANCE_TEST_CONFIG,
 32    find_connector_root,
 33)
 34from airbyte_cdk.utils.docker import (
 35    build_connector_image,
 36    run_docker_airbyte_command,
 37)
 38
 39
 40class DockerConnectorTestSuite:
 41    """Base class for connector test suites."""
 42
 43    @classmethod
 44    def get_test_class_dir(cls) -> Path:
 45        """Get the file path that contains the class."""
 46        module = sys.modules[cls.__module__]
 47        # Get the directory containing the test file
 48        return Path(inspect.getfile(module)).parent
 49
 50    @classmethod
 51    def get_connector_root_dir(cls) -> Path:
 52        """Get the root directory of the connector."""
 53        return find_connector_root([cls.get_test_class_dir(), Path.cwd()])
 54
 55    @classproperty
 56    def connector_name(self) -> str:
 57        """Get the name of the connector."""
 58        connector_root = self.get_connector_root_dir()
 59        return connector_root.absolute().name
 60
 61    @classmethod
 62    def is_destination_connector(cls) -> bool:
 63        """Check if the connector is a destination."""
 64        return cast(str, cls.connector_name).startswith("destination-")
 65
 66    @classproperty
 67    def acceptance_test_config(cls) -> Any:
 68        """Get the contents of acceptance test config file.
 69
 70        Also perform some basic validation that the file has the expected structure.
 71        """
 72        acceptance_test_config_path = cls.get_connector_root_dir() / ACCEPTANCE_TEST_CONFIG
 73        if not acceptance_test_config_path.exists():
 74            raise FileNotFoundError(
 75                f"Acceptance test config file not found at: {str(acceptance_test_config_path)}"
 76            )
 77
 78        tests_config = yaml.safe_load(acceptance_test_config_path.read_text())
 79
 80        if "acceptance_tests" not in tests_config:
 81            raise ValueError(
 82                f"Acceptance tests config not found in {acceptance_test_config_path}."
 83                f" Found only: {str(tests_config)}."
 84            )
 85        return tests_config
 86
 87    @staticmethod
 88    def _dedup_scenarios(scenarios: list[ConnectorTestScenario]) -> list[ConnectorTestScenario]:
 89        """
 90        For FAST tests, we treat each config as a separate test scenario to run against, whereas CATs defined
 91        a series of more granular scenarios specifying a config_path and empty_streams among other things.
 92
 93        This method deduplicates the CATs scenarios based on their config_path. In doing so, we choose to
 94        take the union of any defined empty_streams, to have high confidence that runnning a read with the
 95        config will not error on the lack of data in the empty streams or lack of permissions to read them.
 96
 97        """
 98        deduped_scenarios: list[ConnectorTestScenario] = []
 99
100        for scenario in scenarios:
101            for existing_scenario in deduped_scenarios:
102                if scenario.config_path == existing_scenario.config_path:
103                    # If a scenario with the same config_path already exists, we merge the empty streams.
104                    # scenarios are immutable, so we create a new one.
105                    all_empty_streams = (existing_scenario.empty_streams or []) + (
106                        scenario.empty_streams or []
107                    )
108                    merged_scenario = existing_scenario.model_copy(
109                        update={"empty_streams": list(set(all_empty_streams))}
110                    )
111                    deduped_scenarios.remove(existing_scenario)
112                    deduped_scenarios.append(merged_scenario)
113                    break
114            else:
115                # If a scenario does not exist with the config, add the new scenario to the list.
116                deduped_scenarios.append(scenario)
117        return deduped_scenarios
118
119    @classmethod
120    def get_scenarios(
121        cls,
122    ) -> list[ConnectorTestScenario]:
123        """Get acceptance tests for a given category.
124
125        This has to be a separate function because pytest does not allow
126        parametrization of fixtures with arguments from the test class itself.
127        """
128        try:
129            all_tests_config = cls.acceptance_test_config
130        except FileNotFoundError as e:
131            # Destinations sometimes do not have an acceptance tests file.
132            warnings.warn(
133                f"Acceptance test config file not found: {e!s}. No scenarios will be loaded.",
134                category=UserWarning,
135                stacklevel=1,
136            )
137            return []
138
139        test_scenarios: list[ConnectorTestScenario] = []
140        # we look in the basic_read section to find any empty streams
141        for category in ["spec", "connection", "basic_read"]:
142            if (
143                category not in all_tests_config["acceptance_tests"]
144                or "tests" not in all_tests_config["acceptance_tests"][category]
145            ):
146                continue
147
148            for test in all_tests_config["acceptance_tests"][category]["tests"]:
149                if "config_path" not in test:
150                    # Skip tests without a config_path
151                    continue
152
153                if "iam_role" in test["config_path"]:
154                    # We skip iam_role tests for now, as they are not supported in the test suite.
155                    continue
156
157                scenario = ConnectorTestScenario.model_validate(test)
158
159                test_scenarios.append(scenario)
160
161        deduped_test_scenarios = cls._dedup_scenarios(test_scenarios)
162
163        return deduped_test_scenarios
164
165    @pytest.mark.skipif(
166        shutil.which("docker") is None,
167        reason="docker CLI not found in PATH, skipping docker image tests",
168    )
169    @pytest.mark.image_tests
170    def test_docker_image_build_and_spec(
171        self,
172        connector_image_override: str | None,
173    ) -> None:
174        """Run `docker_image` acceptance tests."""
175        connector_root = self.get_connector_root_dir().absolute()
176        metadata = MetadataFile.from_file(connector_root / "metadata.yaml")
177
178        connector_image: str | None = connector_image_override
179        if not connector_image:
180            tag = "dev-latest"
181            connector_image = build_connector_image(
182                connector_name=connector_root.absolute().name,
183                connector_directory=connector_root,
184                metadata=metadata,
185                tag=tag,
186                no_verify=False,
187            )
188
189        _ = run_docker_airbyte_command(
190            [
191                "docker",
192                "run",
193                "--rm",
194                connector_image,
195                "spec",
196            ],
197            raise_if_errors=True,
198        )
199
200    @pytest.mark.skipif(
201        shutil.which("docker") is None,
202        reason="docker CLI not found in PATH, skipping docker image tests",
203    )
204    @pytest.mark.image_tests
205    def test_docker_image_build_and_check(
206        self,
207        scenario: ConnectorTestScenario,
208        connector_image_override: str | None,
209    ) -> None:
210        """Run `docker_image` acceptance tests.
211
212        This test builds the connector image and runs the `check` command inside the container.
213
214        Note:
215          - It is expected for docker image caches to be reused between test runs.
216          - In the rare case that image caches need to be cleared, please clear
217            the local docker image cache using `docker image prune -a` command.
218        """
219        if scenario.expected_outcome.expect_exception():
220            pytest.skip("Skipping test_docker_image_build_and_check (expected to fail).")
221
222        tag = "dev-latest"
223        connector_root = self.get_connector_root_dir()
224        metadata = MetadataFile.from_file(connector_root / "metadata.yaml")
225        connector_image: str | None = connector_image_override
226        if not connector_image:
227            tag = "dev-latest"
228            connector_image = build_connector_image(
229                connector_name=connector_root.absolute().name,
230                connector_directory=connector_root,
231                metadata=metadata,
232                tag=tag,
233                no_verify=False,
234            )
235
236        container_config_path = "/secrets/config.json"
237        with scenario.with_temp_config_file(
238            connector_root=connector_root,
239        ) as temp_config_file:
240            _ = run_docker_airbyte_command(
241                [
242                    "docker",
243                    "run",
244                    "--rm",
245                    "-v",
246                    f"{temp_config_file}:{container_config_path}",
247                    connector_image,
248                    "check",
249                    "--config",
250                    container_config_path,
251                ],
252                raise_if_errors=True,
253            )
254
255    @pytest.mark.skipif(
256        shutil.which("docker") is None,
257        reason="docker CLI not found in PATH, skipping docker image tests",
258    )
259    @pytest.mark.image_tests
260    def test_docker_image_build_and_read(
261        self,
262        scenario: ConnectorTestScenario,
263        connector_image_override: str | None,
264        read_from_streams: Literal["all", "none", "default"] | list[str],
265        read_scenarios: Literal["all", "none", "default"] | list[str],
266    ) -> None:
267        """Read from the connector's Docker image.
268
269        This test builds the connector image and runs the `read` command inside the container.
270
271        Note:
272          - It is expected for docker image caches to be reused between test runs.
273          - In the rare case that image caches need to be cleared, please clear
274            the local docker image cache using `docker image prune -a` command.
275          - If the --connector-image arg is provided, it will be used instead of building the image.
276        """
277        if self.is_destination_connector():
278            pytest.skip("Skipping read test for destination connector.")
279
280        if scenario.expected_outcome.expect_exception():
281            pytest.skip("Skipping (expected to fail).")
282
283        if read_from_streams == "none":
284            pytest.skip("Skipping read test (`--read-from-streams=false`).")
285
286        if read_scenarios == "none":
287            pytest.skip("Skipping (`--read-scenarios=none`).")
288
289        default_scenario_ids = ["config", "valid_config", "default"]
290        if read_scenarios == "all":
291            pass
292        elif read_scenarios == "default":
293            if scenario.id not in default_scenario_ids:
294                pytest.skip(
295                    f"Skipping read test for scenario '{scenario.id}' "
296                    f"(not in default scenarios list '{default_scenario_ids}')."
297                )
298        elif scenario.id not in read_scenarios:
299            # pytest.skip(
300            raise ValueError(
301                f"Skipping read test for scenario '{scenario.id}' "
302                f"(not in --read-scenarios={read_scenarios})."
303            )
304
305        tag = "dev-latest"
306        connector_root = self.get_connector_root_dir()
307        connector_name = connector_root.absolute().name
308        metadata = MetadataFile.from_file(connector_root / "metadata.yaml")
309        connector_image: str | None = connector_image_override
310        if not connector_image:
311            tag = "dev-latest"
312            connector_image = build_connector_image(
313                connector_name=connector_name,
314                connector_directory=connector_root,
315                metadata=metadata,
316                tag=tag,
317                no_verify=False,
318            )
319
320        container_config_path = "/secrets/config.json"
321        container_catalog_path = "/secrets/catalog.json"
322
323        with (
324            scenario.with_temp_config_file(
325                connector_root=connector_root,
326            ) as temp_config_file,
327            tempfile.TemporaryDirectory(
328                prefix=f"{connector_name}-test",
329                ignore_cleanup_errors=True,
330            ) as temp_dir_str,
331        ):
332            temp_dir = Path(temp_dir_str)
333            discover_result = run_docker_airbyte_command(
334                [
335                    "docker",
336                    "run",
337                    "--rm",
338                    "-v",
339                    f"{temp_config_file}:{container_config_path}",
340                    connector_image,
341                    "discover",
342                    "--config",
343                    container_config_path,
344                ],
345                raise_if_errors=True,
346            )
347
348            catalog_message = discover_result.catalog  # Get catalog message
349            assert catalog_message.catalog is not None, "Catalog message missing catalog."
350            discovered_catalog: AirbyteCatalog = catalog_message.catalog
351            if not discovered_catalog.streams:
352                raise ValueError(
353                    f"Discovered catalog for connector '{connector_name}' is empty. "
354                    "Please check the connector's discover implementation."
355                )
356
357            streams_list = [stream.name for stream in discovered_catalog.streams]
358            if read_from_streams == "default" and metadata.data.suggestedStreams:
359                # set `streams_list` to be the intersection of discovered and suggested streams.
360                streams_list = list(set(streams_list) & set(metadata.data.suggestedStreams.streams))
361
362            if isinstance(read_from_streams, list):
363                # If `read_from_streams` is a list, we filter the discovered streams.
364                streams_list = list(set(streams_list) & set(read_from_streams))
365
366            if scenario.empty_streams:
367                # Filter out streams marked as empty in the scenario.
368                empty_stream_names = [stream.name for stream in scenario.empty_streams]
369                streams_list = [s for s in streams_list if s.name not in empty_stream_names]
370
371            configured_catalog: ConfiguredAirbyteCatalog = ConfiguredAirbyteCatalog(
372                streams=[
373                    ConfiguredAirbyteStream(
374                        stream=stream,
375                        sync_mode=(
376                            stream.supported_sync_modes[0]
377                            if stream.supported_sync_modes
378                            else SyncMode.full_refresh
379                        ),
380                        destination_sync_mode=DestinationSyncMode.append,
381                    )
382                    for stream in discovered_catalog.streams
383                    if stream.name in streams_list
384                ]
385            )
386            configured_catalog_path = temp_dir / "catalog.json"
387            configured_catalog_path.write_text(
388                orjson.dumps(asdict(configured_catalog)).decode("utf-8")
389            )
390            read_result: EntrypointOutput = run_docker_airbyte_command(
391                [
392                    "docker",
393                    "run",
394                    "--rm",
395                    "-v",
396                    f"{temp_config_file}:{container_config_path}",
397                    "-v",
398                    f"{configured_catalog_path}:{container_catalog_path}",
399                    connector_image,
400                    "read",
401                    "--config",
402                    container_config_path,
403                    "--catalog",
404                    container_catalog_path,
405                ],
406                raise_if_errors=True,
407            )
class DockerConnectorTestSuite:
 41class DockerConnectorTestSuite:
 42    """Base class for connector test suites."""
 43
 44    @classmethod
 45    def get_test_class_dir(cls) -> Path:
 46        """Get the file path that contains the class."""
 47        module = sys.modules[cls.__module__]
 48        # Get the directory containing the test file
 49        return Path(inspect.getfile(module)).parent
 50
 51    @classmethod
 52    def get_connector_root_dir(cls) -> Path:
 53        """Get the root directory of the connector."""
 54        return find_connector_root([cls.get_test_class_dir(), Path.cwd()])
 55
 56    @classproperty
 57    def connector_name(self) -> str:
 58        """Get the name of the connector."""
 59        connector_root = self.get_connector_root_dir()
 60        return connector_root.absolute().name
 61
 62    @classmethod
 63    def is_destination_connector(cls) -> bool:
 64        """Check if the connector is a destination."""
 65        return cast(str, cls.connector_name).startswith("destination-")
 66
 67    @classproperty
 68    def acceptance_test_config(cls) -> Any:
 69        """Get the contents of acceptance test config file.
 70
 71        Also perform some basic validation that the file has the expected structure.
 72        """
 73        acceptance_test_config_path = cls.get_connector_root_dir() / ACCEPTANCE_TEST_CONFIG
 74        if not acceptance_test_config_path.exists():
 75            raise FileNotFoundError(
 76                f"Acceptance test config file not found at: {str(acceptance_test_config_path)}"
 77            )
 78
 79        tests_config = yaml.safe_load(acceptance_test_config_path.read_text())
 80
 81        if "acceptance_tests" not in tests_config:
 82            raise ValueError(
 83                f"Acceptance tests config not found in {acceptance_test_config_path}."
 84                f" Found only: {str(tests_config)}."
 85            )
 86        return tests_config
 87
 88    @staticmethod
 89    def _dedup_scenarios(scenarios: list[ConnectorTestScenario]) -> list[ConnectorTestScenario]:
 90        """
 91        For FAST tests, we treat each config as a separate test scenario to run against, whereas CATs defined
 92        a series of more granular scenarios specifying a config_path and empty_streams among other things.
 93
 94        This method deduplicates the CATs scenarios based on their config_path. In doing so, we choose to
 95        take the union of any defined empty_streams, to have high confidence that runnning a read with the
 96        config will not error on the lack of data in the empty streams or lack of permissions to read them.
 97
 98        """
 99        deduped_scenarios: list[ConnectorTestScenario] = []
100
101        for scenario in scenarios:
102            for existing_scenario in deduped_scenarios:
103                if scenario.config_path == existing_scenario.config_path:
104                    # If a scenario with the same config_path already exists, we merge the empty streams.
105                    # scenarios are immutable, so we create a new one.
106                    all_empty_streams = (existing_scenario.empty_streams or []) + (
107                        scenario.empty_streams or []
108                    )
109                    merged_scenario = existing_scenario.model_copy(
110                        update={"empty_streams": list(set(all_empty_streams))}
111                    )
112                    deduped_scenarios.remove(existing_scenario)
113                    deduped_scenarios.append(merged_scenario)
114                    break
115            else:
116                # If a scenario does not exist with the config, add the new scenario to the list.
117                deduped_scenarios.append(scenario)
118        return deduped_scenarios
119
120    @classmethod
121    def get_scenarios(
122        cls,
123    ) -> list[ConnectorTestScenario]:
124        """Get acceptance tests for a given category.
125
126        This has to be a separate function because pytest does not allow
127        parametrization of fixtures with arguments from the test class itself.
128        """
129        try:
130            all_tests_config = cls.acceptance_test_config
131        except FileNotFoundError as e:
132            # Destinations sometimes do not have an acceptance tests file.
133            warnings.warn(
134                f"Acceptance test config file not found: {e!s}. No scenarios will be loaded.",
135                category=UserWarning,
136                stacklevel=1,
137            )
138            return []
139
140        test_scenarios: list[ConnectorTestScenario] = []
141        # we look in the basic_read section to find any empty streams
142        for category in ["spec", "connection", "basic_read"]:
143            if (
144                category not in all_tests_config["acceptance_tests"]
145                or "tests" not in all_tests_config["acceptance_tests"][category]
146            ):
147                continue
148
149            for test in all_tests_config["acceptance_tests"][category]["tests"]:
150                if "config_path" not in test:
151                    # Skip tests without a config_path
152                    continue
153
154                if "iam_role" in test["config_path"]:
155                    # We skip iam_role tests for now, as they are not supported in the test suite.
156                    continue
157
158                scenario = ConnectorTestScenario.model_validate(test)
159
160                test_scenarios.append(scenario)
161
162        deduped_test_scenarios = cls._dedup_scenarios(test_scenarios)
163
164        return deduped_test_scenarios
165
166    @pytest.mark.skipif(
167        shutil.which("docker") is None,
168        reason="docker CLI not found in PATH, skipping docker image tests",
169    )
170    @pytest.mark.image_tests
171    def test_docker_image_build_and_spec(
172        self,
173        connector_image_override: str | None,
174    ) -> None:
175        """Run `docker_image` acceptance tests."""
176        connector_root = self.get_connector_root_dir().absolute()
177        metadata = MetadataFile.from_file(connector_root / "metadata.yaml")
178
179        connector_image: str | None = connector_image_override
180        if not connector_image:
181            tag = "dev-latest"
182            connector_image = build_connector_image(
183                connector_name=connector_root.absolute().name,
184                connector_directory=connector_root,
185                metadata=metadata,
186                tag=tag,
187                no_verify=False,
188            )
189
190        _ = run_docker_airbyte_command(
191            [
192                "docker",
193                "run",
194                "--rm",
195                connector_image,
196                "spec",
197            ],
198            raise_if_errors=True,
199        )
200
201    @pytest.mark.skipif(
202        shutil.which("docker") is None,
203        reason="docker CLI not found in PATH, skipping docker image tests",
204    )
205    @pytest.mark.image_tests
206    def test_docker_image_build_and_check(
207        self,
208        scenario: ConnectorTestScenario,
209        connector_image_override: str | None,
210    ) -> None:
211        """Run `docker_image` acceptance tests.
212
213        This test builds the connector image and runs the `check` command inside the container.
214
215        Note:
216          - It is expected for docker image caches to be reused between test runs.
217          - In the rare case that image caches need to be cleared, please clear
218            the local docker image cache using `docker image prune -a` command.
219        """
220        if scenario.expected_outcome.expect_exception():
221            pytest.skip("Skipping test_docker_image_build_and_check (expected to fail).")
222
223        tag = "dev-latest"
224        connector_root = self.get_connector_root_dir()
225        metadata = MetadataFile.from_file(connector_root / "metadata.yaml")
226        connector_image: str | None = connector_image_override
227        if not connector_image:
228            tag = "dev-latest"
229            connector_image = build_connector_image(
230                connector_name=connector_root.absolute().name,
231                connector_directory=connector_root,
232                metadata=metadata,
233                tag=tag,
234                no_verify=False,
235            )
236
237        container_config_path = "/secrets/config.json"
238        with scenario.with_temp_config_file(
239            connector_root=connector_root,
240        ) as temp_config_file:
241            _ = run_docker_airbyte_command(
242                [
243                    "docker",
244                    "run",
245                    "--rm",
246                    "-v",
247                    f"{temp_config_file}:{container_config_path}",
248                    connector_image,
249                    "check",
250                    "--config",
251                    container_config_path,
252                ],
253                raise_if_errors=True,
254            )
255
256    @pytest.mark.skipif(
257        shutil.which("docker") is None,
258        reason="docker CLI not found in PATH, skipping docker image tests",
259    )
260    @pytest.mark.image_tests
261    def test_docker_image_build_and_read(
262        self,
263        scenario: ConnectorTestScenario,
264        connector_image_override: str | None,
265        read_from_streams: Literal["all", "none", "default"] | list[str],
266        read_scenarios: Literal["all", "none", "default"] | list[str],
267    ) -> None:
268        """Read from the connector's Docker image.
269
270        This test builds the connector image and runs the `read` command inside the container.
271
272        Note:
273          - It is expected for docker image caches to be reused between test runs.
274          - In the rare case that image caches need to be cleared, please clear
275            the local docker image cache using `docker image prune -a` command.
276          - If the --connector-image arg is provided, it will be used instead of building the image.
277        """
278        if self.is_destination_connector():
279            pytest.skip("Skipping read test for destination connector.")
280
281        if scenario.expected_outcome.expect_exception():
282            pytest.skip("Skipping (expected to fail).")
283
284        if read_from_streams == "none":
285            pytest.skip("Skipping read test (`--read-from-streams=false`).")
286
287        if read_scenarios == "none":
288            pytest.skip("Skipping (`--read-scenarios=none`).")
289
290        default_scenario_ids = ["config", "valid_config", "default"]
291        if read_scenarios == "all":
292            pass
293        elif read_scenarios == "default":
294            if scenario.id not in default_scenario_ids:
295                pytest.skip(
296                    f"Skipping read test for scenario '{scenario.id}' "
297                    f"(not in default scenarios list '{default_scenario_ids}')."
298                )
299        elif scenario.id not in read_scenarios:
300            # pytest.skip(
301            raise ValueError(
302                f"Skipping read test for scenario '{scenario.id}' "
303                f"(not in --read-scenarios={read_scenarios})."
304            )
305
306        tag = "dev-latest"
307        connector_root = self.get_connector_root_dir()
308        connector_name = connector_root.absolute().name
309        metadata = MetadataFile.from_file(connector_root / "metadata.yaml")
310        connector_image: str | None = connector_image_override
311        if not connector_image:
312            tag = "dev-latest"
313            connector_image = build_connector_image(
314                connector_name=connector_name,
315                connector_directory=connector_root,
316                metadata=metadata,
317                tag=tag,
318                no_verify=False,
319            )
320
321        container_config_path = "/secrets/config.json"
322        container_catalog_path = "/secrets/catalog.json"
323
324        with (
325            scenario.with_temp_config_file(
326                connector_root=connector_root,
327            ) as temp_config_file,
328            tempfile.TemporaryDirectory(
329                prefix=f"{connector_name}-test",
330                ignore_cleanup_errors=True,
331            ) as temp_dir_str,
332        ):
333            temp_dir = Path(temp_dir_str)
334            discover_result = run_docker_airbyte_command(
335                [
336                    "docker",
337                    "run",
338                    "--rm",
339                    "-v",
340                    f"{temp_config_file}:{container_config_path}",
341                    connector_image,
342                    "discover",
343                    "--config",
344                    container_config_path,
345                ],
346                raise_if_errors=True,
347            )
348
349            catalog_message = discover_result.catalog  # Get catalog message
350            assert catalog_message.catalog is not None, "Catalog message missing catalog."
351            discovered_catalog: AirbyteCatalog = catalog_message.catalog
352            if not discovered_catalog.streams:
353                raise ValueError(
354                    f"Discovered catalog for connector '{connector_name}' is empty. "
355                    "Please check the connector's discover implementation."
356                )
357
358            streams_list = [stream.name for stream in discovered_catalog.streams]
359            if read_from_streams == "default" and metadata.data.suggestedStreams:
360                # set `streams_list` to be the intersection of discovered and suggested streams.
361                streams_list = list(set(streams_list) & set(metadata.data.suggestedStreams.streams))
362
363            if isinstance(read_from_streams, list):
364                # If `read_from_streams` is a list, we filter the discovered streams.
365                streams_list = list(set(streams_list) & set(read_from_streams))
366
367            if scenario.empty_streams:
368                # Filter out streams marked as empty in the scenario.
369                empty_stream_names = [stream.name for stream in scenario.empty_streams]
370                streams_list = [s for s in streams_list if s.name not in empty_stream_names]
371
372            configured_catalog: ConfiguredAirbyteCatalog = ConfiguredAirbyteCatalog(
373                streams=[
374                    ConfiguredAirbyteStream(
375                        stream=stream,
376                        sync_mode=(
377                            stream.supported_sync_modes[0]
378                            if stream.supported_sync_modes
379                            else SyncMode.full_refresh
380                        ),
381                        destination_sync_mode=DestinationSyncMode.append,
382                    )
383                    for stream in discovered_catalog.streams
384                    if stream.name in streams_list
385                ]
386            )
387            configured_catalog_path = temp_dir / "catalog.json"
388            configured_catalog_path.write_text(
389                orjson.dumps(asdict(configured_catalog)).decode("utf-8")
390            )
391            read_result: EntrypointOutput = run_docker_airbyte_command(
392                [
393                    "docker",
394                    "run",
395                    "--rm",
396                    "-v",
397                    f"{temp_config_file}:{container_config_path}",
398                    "-v",
399                    f"{configured_catalog_path}:{container_catalog_path}",
400                    connector_image,
401                    "read",
402                    "--config",
403                    container_config_path,
404                    "--catalog",
405                    container_catalog_path,
406                ],
407                raise_if_errors=True,
408            )

Base class for connector test suites.

@classmethod
def get_test_class_dir(cls) -> pathlib.Path:
44    @classmethod
45    def get_test_class_dir(cls) -> Path:
46        """Get the file path that contains the class."""
47        module = sys.modules[cls.__module__]
48        # Get the directory containing the test file
49        return Path(inspect.getfile(module)).parent

Get the file path that contains the class.

@classmethod
def get_connector_root_dir(cls) -> pathlib.Path:
51    @classmethod
52    def get_connector_root_dir(cls) -> Path:
53        """Get the root directory of the connector."""
54        return find_connector_root([cls.get_test_class_dir(), Path.cwd()])

Get the root directory of the connector.

def connector_name(unknown):

Much like a property, but the wrapped get function is a class method. For simplicity, only read-only properties are implemented.

@classmethod
def is_destination_connector(cls) -> bool:
62    @classmethod
63    def is_destination_connector(cls) -> bool:
64        """Check if the connector is a destination."""
65        return cast(str, cls.connector_name).startswith("destination-")

Check if the connector is a destination.

def acceptance_test_config(unknown):

Much like a property, but the wrapped get function is a class method. For simplicity, only read-only properties are implemented.

@classmethod
def get_scenarios(cls) -> list[airbyte_cdk.test.models.ConnectorTestScenario]:
120    @classmethod
121    def get_scenarios(
122        cls,
123    ) -> list[ConnectorTestScenario]:
124        """Get acceptance tests for a given category.
125
126        This has to be a separate function because pytest does not allow
127        parametrization of fixtures with arguments from the test class itself.
128        """
129        try:
130            all_tests_config = cls.acceptance_test_config
131        except FileNotFoundError as e:
132            # Destinations sometimes do not have an acceptance tests file.
133            warnings.warn(
134                f"Acceptance test config file not found: {e!s}. No scenarios will be loaded.",
135                category=UserWarning,
136                stacklevel=1,
137            )
138            return []
139
140        test_scenarios: list[ConnectorTestScenario] = []
141        # we look in the basic_read section to find any empty streams
142        for category in ["spec", "connection", "basic_read"]:
143            if (
144                category not in all_tests_config["acceptance_tests"]
145                or "tests" not in all_tests_config["acceptance_tests"][category]
146            ):
147                continue
148
149            for test in all_tests_config["acceptance_tests"][category]["tests"]:
150                if "config_path" not in test:
151                    # Skip tests without a config_path
152                    continue
153
154                if "iam_role" in test["config_path"]:
155                    # We skip iam_role tests for now, as they are not supported in the test suite.
156                    continue
157
158                scenario = ConnectorTestScenario.model_validate(test)
159
160                test_scenarios.append(scenario)
161
162        deduped_test_scenarios = cls._dedup_scenarios(test_scenarios)
163
164        return deduped_test_scenarios

Get acceptance tests for a given category.

This has to be a separate function because pytest does not allow parametrization of fixtures with arguments from the test class itself.

@pytest.mark.skipif(shutil.which('docker') is None, reason='docker CLI not found in PATH, skipping docker image tests')
@pytest.mark.image_tests
def test_docker_image_build_and_spec(self, connector_image_override: str | None) -> None:
166    @pytest.mark.skipif(
167        shutil.which("docker") is None,
168        reason="docker CLI not found in PATH, skipping docker image tests",
169    )
170    @pytest.mark.image_tests
171    def test_docker_image_build_and_spec(
172        self,
173        connector_image_override: str | None,
174    ) -> None:
175        """Run `docker_image` acceptance tests."""
176        connector_root = self.get_connector_root_dir().absolute()
177        metadata = MetadataFile.from_file(connector_root / "metadata.yaml")
178
179        connector_image: str | None = connector_image_override
180        if not connector_image:
181            tag = "dev-latest"
182            connector_image = build_connector_image(
183                connector_name=connector_root.absolute().name,
184                connector_directory=connector_root,
185                metadata=metadata,
186                tag=tag,
187                no_verify=False,
188            )
189
190        _ = run_docker_airbyte_command(
191            [
192                "docker",
193                "run",
194                "--rm",
195                connector_image,
196                "spec",
197            ],
198            raise_if_errors=True,
199        )

Run docker_image acceptance tests.

@pytest.mark.skipif(shutil.which('docker') is None, reason='docker CLI not found in PATH, skipping docker image tests')
@pytest.mark.image_tests
def test_docker_image_build_and_check( self, scenario: airbyte_cdk.test.models.ConnectorTestScenario, connector_image_override: str | None) -> None:
201    @pytest.mark.skipif(
202        shutil.which("docker") is None,
203        reason="docker CLI not found in PATH, skipping docker image tests",
204    )
205    @pytest.mark.image_tests
206    def test_docker_image_build_and_check(
207        self,
208        scenario: ConnectorTestScenario,
209        connector_image_override: str | None,
210    ) -> None:
211        """Run `docker_image` acceptance tests.
212
213        This test builds the connector image and runs the `check` command inside the container.
214
215        Note:
216          - It is expected for docker image caches to be reused between test runs.
217          - In the rare case that image caches need to be cleared, please clear
218            the local docker image cache using `docker image prune -a` command.
219        """
220        if scenario.expected_outcome.expect_exception():
221            pytest.skip("Skipping test_docker_image_build_and_check (expected to fail).")
222
223        tag = "dev-latest"
224        connector_root = self.get_connector_root_dir()
225        metadata = MetadataFile.from_file(connector_root / "metadata.yaml")
226        connector_image: str | None = connector_image_override
227        if not connector_image:
228            tag = "dev-latest"
229            connector_image = build_connector_image(
230                connector_name=connector_root.absolute().name,
231                connector_directory=connector_root,
232                metadata=metadata,
233                tag=tag,
234                no_verify=False,
235            )
236
237        container_config_path = "/secrets/config.json"
238        with scenario.with_temp_config_file(
239            connector_root=connector_root,
240        ) as temp_config_file:
241            _ = run_docker_airbyte_command(
242                [
243                    "docker",
244                    "run",
245                    "--rm",
246                    "-v",
247                    f"{temp_config_file}:{container_config_path}",
248                    connector_image,
249                    "check",
250                    "--config",
251                    container_config_path,
252                ],
253                raise_if_errors=True,
254            )

Run docker_image acceptance tests.

This test builds the connector image and runs the check command inside the container.

Note:
  • It is expected for docker image caches to be reused between test runs.
  • In the rare case that image caches need to be cleared, please clear the local docker image cache using docker image prune -a command.
@pytest.mark.skipif(shutil.which('docker') is None, reason='docker CLI not found in PATH, skipping docker image tests')
@pytest.mark.image_tests
def test_docker_image_build_and_read( self, scenario: airbyte_cdk.test.models.ConnectorTestScenario, connector_image_override: str | None, read_from_streams: Union[Literal['all', 'none', 'default'], list[str]], read_scenarios: Union[Literal['all', 'none', 'default'], list[str]]) -> None:
256    @pytest.mark.skipif(
257        shutil.which("docker") is None,
258        reason="docker CLI not found in PATH, skipping docker image tests",
259    )
260    @pytest.mark.image_tests
261    def test_docker_image_build_and_read(
262        self,
263        scenario: ConnectorTestScenario,
264        connector_image_override: str | None,
265        read_from_streams: Literal["all", "none", "default"] | list[str],
266        read_scenarios: Literal["all", "none", "default"] | list[str],
267    ) -> None:
268        """Read from the connector's Docker image.
269
270        This test builds the connector image and runs the `read` command inside the container.
271
272        Note:
273          - It is expected for docker image caches to be reused between test runs.
274          - In the rare case that image caches need to be cleared, please clear
275            the local docker image cache using `docker image prune -a` command.
276          - If the --connector-image arg is provided, it will be used instead of building the image.
277        """
278        if self.is_destination_connector():
279            pytest.skip("Skipping read test for destination connector.")
280
281        if scenario.expected_outcome.expect_exception():
282            pytest.skip("Skipping (expected to fail).")
283
284        if read_from_streams == "none":
285            pytest.skip("Skipping read test (`--read-from-streams=false`).")
286
287        if read_scenarios == "none":
288            pytest.skip("Skipping (`--read-scenarios=none`).")
289
290        default_scenario_ids = ["config", "valid_config", "default"]
291        if read_scenarios == "all":
292            pass
293        elif read_scenarios == "default":
294            if scenario.id not in default_scenario_ids:
295                pytest.skip(
296                    f"Skipping read test for scenario '{scenario.id}' "
297                    f"(not in default scenarios list '{default_scenario_ids}')."
298                )
299        elif scenario.id not in read_scenarios:
300            # pytest.skip(
301            raise ValueError(
302                f"Skipping read test for scenario '{scenario.id}' "
303                f"(not in --read-scenarios={read_scenarios})."
304            )
305
306        tag = "dev-latest"
307        connector_root = self.get_connector_root_dir()
308        connector_name = connector_root.absolute().name
309        metadata = MetadataFile.from_file(connector_root / "metadata.yaml")
310        connector_image: str | None = connector_image_override
311        if not connector_image:
312            tag = "dev-latest"
313            connector_image = build_connector_image(
314                connector_name=connector_name,
315                connector_directory=connector_root,
316                metadata=metadata,
317                tag=tag,
318                no_verify=False,
319            )
320
321        container_config_path = "/secrets/config.json"
322        container_catalog_path = "/secrets/catalog.json"
323
324        with (
325            scenario.with_temp_config_file(
326                connector_root=connector_root,
327            ) as temp_config_file,
328            tempfile.TemporaryDirectory(
329                prefix=f"{connector_name}-test",
330                ignore_cleanup_errors=True,
331            ) as temp_dir_str,
332        ):
333            temp_dir = Path(temp_dir_str)
334            discover_result = run_docker_airbyte_command(
335                [
336                    "docker",
337                    "run",
338                    "--rm",
339                    "-v",
340                    f"{temp_config_file}:{container_config_path}",
341                    connector_image,
342                    "discover",
343                    "--config",
344                    container_config_path,
345                ],
346                raise_if_errors=True,
347            )
348
349            catalog_message = discover_result.catalog  # Get catalog message
350            assert catalog_message.catalog is not None, "Catalog message missing catalog."
351            discovered_catalog: AirbyteCatalog = catalog_message.catalog
352            if not discovered_catalog.streams:
353                raise ValueError(
354                    f"Discovered catalog for connector '{connector_name}' is empty. "
355                    "Please check the connector's discover implementation."
356                )
357
358            streams_list = [stream.name for stream in discovered_catalog.streams]
359            if read_from_streams == "default" and metadata.data.suggestedStreams:
360                # set `streams_list` to be the intersection of discovered and suggested streams.
361                streams_list = list(set(streams_list) & set(metadata.data.suggestedStreams.streams))
362
363            if isinstance(read_from_streams, list):
364                # If `read_from_streams` is a list, we filter the discovered streams.
365                streams_list = list(set(streams_list) & set(read_from_streams))
366
367            if scenario.empty_streams:
368                # Filter out streams marked as empty in the scenario.
369                empty_stream_names = [stream.name for stream in scenario.empty_streams]
370                streams_list = [s for s in streams_list if s.name not in empty_stream_names]
371
372            configured_catalog: ConfiguredAirbyteCatalog = ConfiguredAirbyteCatalog(
373                streams=[
374                    ConfiguredAirbyteStream(
375                        stream=stream,
376                        sync_mode=(
377                            stream.supported_sync_modes[0]
378                            if stream.supported_sync_modes
379                            else SyncMode.full_refresh
380                        ),
381                        destination_sync_mode=DestinationSyncMode.append,
382                    )
383                    for stream in discovered_catalog.streams
384                    if stream.name in streams_list
385                ]
386            )
387            configured_catalog_path = temp_dir / "catalog.json"
388            configured_catalog_path.write_text(
389                orjson.dumps(asdict(configured_catalog)).decode("utf-8")
390            )
391            read_result: EntrypointOutput = run_docker_airbyte_command(
392                [
393                    "docker",
394                    "run",
395                    "--rm",
396                    "-v",
397                    f"{temp_config_file}:{container_config_path}",
398                    "-v",
399                    f"{configured_catalog_path}:{container_catalog_path}",
400                    connector_image,
401                    "read",
402                    "--config",
403                    container_config_path,
404                    "--catalog",
405                    container_catalog_path,
406                ],
407                raise_if_errors=True,
408            )

Read from the connector's Docker image.

This test builds the connector image and runs the read command inside the container.

Note:
  • It is expected for docker image caches to be reused between test runs.
  • In the rare case that image caches need to be cleared, please clear the local docker image cache using docker image prune -a command.
  • If the --connector-image arg is provided, it will be used instead of building the image.