airbyte_cdk.test.standard_tests.docker_base
Base class for connector test suites.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Base class for connector test suites.""" 3 4from __future__ import annotations 5 6import inspect 7import shutil 8import sys 9import tempfile 10import warnings 11from dataclasses import asdict 12from pathlib import Path 13from typing import Any, Literal, cast 14 15import orjson 16import pytest 17import yaml 18from boltons.typeutils import classproperty 19 20from airbyte_cdk.models import ( 21 AirbyteCatalog, 22 ConfiguredAirbyteCatalog, 23 ConfiguredAirbyteStream, 24 DestinationSyncMode, 25 SyncMode, 26) 27from airbyte_cdk.models.connector_metadata import MetadataFile 28from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput 29from airbyte_cdk.test.models import ConnectorTestScenario 30from airbyte_cdk.utils.connector_paths import ( 31 ACCEPTANCE_TEST_CONFIG, 32 find_connector_root, 33) 34from airbyte_cdk.utils.docker import ( 35 build_connector_image, 36 run_docker_airbyte_command, 37) 38 39 40class DockerConnectorTestSuite: 41 """Base class for connector test suites.""" 42 43 @classmethod 44 def get_test_class_dir(cls) -> Path: 45 """Get the file path that contains the class.""" 46 module = sys.modules[cls.__module__] 47 # Get the directory containing the test file 48 return Path(inspect.getfile(module)).parent 49 50 @classmethod 51 def get_connector_root_dir(cls) -> Path: 52 """Get the root directory of the connector.""" 53 return find_connector_root([cls.get_test_class_dir(), Path.cwd()]) 54 55 @classproperty 56 def connector_name(self) -> str: 57 """Get the name of the connector.""" 58 connector_root = self.get_connector_root_dir() 59 return connector_root.absolute().name 60 61 @classmethod 62 def is_destination_connector(cls) -> bool: 63 """Check if the connector is a destination.""" 64 return cast(str, cls.connector_name).startswith("destination-") 65 66 @classproperty 67 def acceptance_test_config(cls) -> Any: 68 """Get the contents of acceptance test config file. 69 70 Also perform some basic validation that the file has the expected structure. 71 """ 72 acceptance_test_config_path = cls.get_connector_root_dir() / ACCEPTANCE_TEST_CONFIG 73 if not acceptance_test_config_path.exists(): 74 raise FileNotFoundError( 75 f"Acceptance test config file not found at: {str(acceptance_test_config_path)}" 76 ) 77 78 tests_config = yaml.safe_load(acceptance_test_config_path.read_text()) 79 80 if "acceptance_tests" not in tests_config: 81 raise ValueError( 82 f"Acceptance tests config not found in {acceptance_test_config_path}." 83 f" Found only: {str(tests_config)}." 84 ) 85 return tests_config 86 87 @staticmethod 88 def _dedup_scenarios(scenarios: list[ConnectorTestScenario]) -> list[ConnectorTestScenario]: 89 """ 90 For FAST tests, we treat each config as a separate test scenario to run against, whereas CATs defined 91 a series of more granular scenarios specifying a config_path and empty_streams among other things. 92 93 This method deduplicates the CATs scenarios based on their config_path. In doing so, we choose to 94 take the union of any defined empty_streams, to have high confidence that runnning a read with the 95 config will not error on the lack of data in the empty streams or lack of permissions to read them. 96 97 """ 98 deduped_scenarios: list[ConnectorTestScenario] = [] 99 100 for scenario in scenarios: 101 for existing_scenario in deduped_scenarios: 102 if scenario.config_path == existing_scenario.config_path: 103 # If a scenario with the same config_path already exists, we merge the empty streams. 104 # scenarios are immutable, so we create a new one. 105 all_empty_streams = (existing_scenario.empty_streams or []) + ( 106 scenario.empty_streams or [] 107 ) 108 merged_scenario = existing_scenario.model_copy( 109 update={"empty_streams": list(set(all_empty_streams))} 110 ) 111 deduped_scenarios.remove(existing_scenario) 112 deduped_scenarios.append(merged_scenario) 113 break 114 else: 115 # If a scenario does not exist with the config, add the new scenario to the list. 116 deduped_scenarios.append(scenario) 117 return deduped_scenarios 118 119 @classmethod 120 def get_scenarios( 121 cls, 122 ) -> list[ConnectorTestScenario]: 123 """Get acceptance tests for a given category. 124 125 This has to be a separate function because pytest does not allow 126 parametrization of fixtures with arguments from the test class itself. 127 """ 128 try: 129 all_tests_config = cls.acceptance_test_config 130 except FileNotFoundError as e: 131 # Destinations sometimes do not have an acceptance tests file. 132 warnings.warn( 133 f"Acceptance test config file not found: {e!s}. No scenarios will be loaded.", 134 category=UserWarning, 135 stacklevel=1, 136 ) 137 return [] 138 139 test_scenarios: list[ConnectorTestScenario] = [] 140 # we look in the basic_read section to find any empty streams 141 for category in ["spec", "connection", "basic_read"]: 142 if ( 143 category not in all_tests_config["acceptance_tests"] 144 or "tests" not in all_tests_config["acceptance_tests"][category] 145 ): 146 continue 147 148 for test in all_tests_config["acceptance_tests"][category]["tests"]: 149 if "config_path" not in test: 150 # Skip tests without a config_path 151 continue 152 153 if "iam_role" in test["config_path"]: 154 # We skip iam_role tests for now, as they are not supported in the test suite. 155 continue 156 157 scenario = ConnectorTestScenario.model_validate(test) 158 159 test_scenarios.append(scenario) 160 161 deduped_test_scenarios = cls._dedup_scenarios(test_scenarios) 162 163 return deduped_test_scenarios 164 165 @pytest.mark.skipif( 166 shutil.which("docker") is None, 167 reason="docker CLI not found in PATH, skipping docker image tests", 168 ) 169 @pytest.mark.image_tests 170 def test_docker_image_build_and_spec( 171 self, 172 connector_image_override: str | None, 173 ) -> None: 174 """Run `docker_image` acceptance tests.""" 175 connector_root = self.get_connector_root_dir().absolute() 176 metadata = MetadataFile.from_file(connector_root / "metadata.yaml") 177 178 connector_image: str | None = connector_image_override 179 if not connector_image: 180 tag = "dev-latest" 181 connector_image = build_connector_image( 182 connector_name=connector_root.absolute().name, 183 connector_directory=connector_root, 184 metadata=metadata, 185 tag=tag, 186 no_verify=False, 187 ) 188 189 _ = run_docker_airbyte_command( 190 [ 191 "docker", 192 "run", 193 "--rm", 194 connector_image, 195 "spec", 196 ], 197 raise_if_errors=True, 198 ) 199 200 @pytest.mark.skipif( 201 shutil.which("docker") is None, 202 reason="docker CLI not found in PATH, skipping docker image tests", 203 ) 204 @pytest.mark.image_tests 205 def test_docker_image_build_and_check( 206 self, 207 scenario: ConnectorTestScenario, 208 connector_image_override: str | None, 209 ) -> None: 210 """Run `docker_image` acceptance tests. 211 212 This test builds the connector image and runs the `check` command inside the container. 213 214 Note: 215 - It is expected for docker image caches to be reused between test runs. 216 - In the rare case that image caches need to be cleared, please clear 217 the local docker image cache using `docker image prune -a` command. 218 """ 219 if scenario.expected_outcome.expect_exception(): 220 pytest.skip("Skipping test_docker_image_build_and_check (expected to fail).") 221 222 tag = "dev-latest" 223 connector_root = self.get_connector_root_dir() 224 metadata = MetadataFile.from_file(connector_root / "metadata.yaml") 225 connector_image: str | None = connector_image_override 226 if not connector_image: 227 tag = "dev-latest" 228 connector_image = build_connector_image( 229 connector_name=connector_root.absolute().name, 230 connector_directory=connector_root, 231 metadata=metadata, 232 tag=tag, 233 no_verify=False, 234 ) 235 236 container_config_path = "/secrets/config.json" 237 with scenario.with_temp_config_file( 238 connector_root=connector_root, 239 ) as temp_config_file: 240 _ = run_docker_airbyte_command( 241 [ 242 "docker", 243 "run", 244 "--rm", 245 "-v", 246 f"{temp_config_file}:{container_config_path}", 247 connector_image, 248 "check", 249 "--config", 250 container_config_path, 251 ], 252 raise_if_errors=True, 253 ) 254 255 @pytest.mark.skipif( 256 shutil.which("docker") is None, 257 reason="docker CLI not found in PATH, skipping docker image tests", 258 ) 259 @pytest.mark.image_tests 260 def test_docker_image_build_and_read( 261 self, 262 scenario: ConnectorTestScenario, 263 connector_image_override: str | None, 264 read_from_streams: Literal["all", "none", "default"] | list[str], 265 read_scenarios: Literal["all", "none", "default"] | list[str], 266 ) -> None: 267 """Read from the connector's Docker image. 268 269 This test builds the connector image and runs the `read` command inside the container. 270 271 Note: 272 - It is expected for docker image caches to be reused between test runs. 273 - In the rare case that image caches need to be cleared, please clear 274 the local docker image cache using `docker image prune -a` command. 275 - If the --connector-image arg is provided, it will be used instead of building the image. 276 """ 277 if self.is_destination_connector(): 278 pytest.skip("Skipping read test for destination connector.") 279 280 if scenario.expected_outcome.expect_exception(): 281 pytest.skip("Skipping (expected to fail).") 282 283 if read_from_streams == "none": 284 pytest.skip("Skipping read test (`--read-from-streams=false`).") 285 286 if read_scenarios == "none": 287 pytest.skip("Skipping (`--read-scenarios=none`).") 288 289 default_scenario_ids = ["config", "valid_config", "default"] 290 if read_scenarios == "all": 291 pass 292 elif read_scenarios == "default": 293 if scenario.id not in default_scenario_ids: 294 pytest.skip( 295 f"Skipping read test for scenario '{scenario.id}' " 296 f"(not in default scenarios list '{default_scenario_ids}')." 297 ) 298 elif scenario.id not in read_scenarios: 299 # pytest.skip( 300 raise ValueError( 301 f"Skipping read test for scenario '{scenario.id}' " 302 f"(not in --read-scenarios={read_scenarios})." 303 ) 304 305 tag = "dev-latest" 306 connector_root = self.get_connector_root_dir() 307 connector_name = connector_root.absolute().name 308 metadata = MetadataFile.from_file(connector_root / "metadata.yaml") 309 connector_image: str | None = connector_image_override 310 if not connector_image: 311 tag = "dev-latest" 312 connector_image = build_connector_image( 313 connector_name=connector_name, 314 connector_directory=connector_root, 315 metadata=metadata, 316 tag=tag, 317 no_verify=False, 318 ) 319 320 container_config_path = "/secrets/config.json" 321 container_catalog_path = "/secrets/catalog.json" 322 323 with ( 324 scenario.with_temp_config_file( 325 connector_root=connector_root, 326 ) as temp_config_file, 327 tempfile.TemporaryDirectory( 328 prefix=f"{connector_name}-test", 329 ignore_cleanup_errors=True, 330 ) as temp_dir_str, 331 ): 332 temp_dir = Path(temp_dir_str) 333 discover_result = run_docker_airbyte_command( 334 [ 335 "docker", 336 "run", 337 "--rm", 338 "-v", 339 f"{temp_config_file}:{container_config_path}", 340 connector_image, 341 "discover", 342 "--config", 343 container_config_path, 344 ], 345 raise_if_errors=True, 346 ) 347 348 catalog_message = discover_result.catalog # Get catalog message 349 assert catalog_message.catalog is not None, "Catalog message missing catalog." 350 discovered_catalog: AirbyteCatalog = catalog_message.catalog 351 if not discovered_catalog.streams: 352 raise ValueError( 353 f"Discovered catalog for connector '{connector_name}' is empty. " 354 "Please check the connector's discover implementation." 355 ) 356 357 streams_list = [stream.name for stream in discovered_catalog.streams] 358 if read_from_streams == "default" and metadata.data.suggestedStreams: 359 # set `streams_list` to be the intersection of discovered and suggested streams. 360 streams_list = list(set(streams_list) & set(metadata.data.suggestedStreams.streams)) 361 362 if isinstance(read_from_streams, list): 363 # If `read_from_streams` is a list, we filter the discovered streams. 364 streams_list = list(set(streams_list) & set(read_from_streams)) 365 366 if scenario.empty_streams: 367 # Filter out streams marked as empty in the scenario. 368 empty_stream_names = [stream.name for stream in scenario.empty_streams] 369 streams_list = [s for s in streams_list if s.name not in empty_stream_names] 370 371 configured_catalog: ConfiguredAirbyteCatalog = ConfiguredAirbyteCatalog( 372 streams=[ 373 ConfiguredAirbyteStream( 374 stream=stream, 375 sync_mode=( 376 stream.supported_sync_modes[0] 377 if stream.supported_sync_modes 378 else SyncMode.full_refresh 379 ), 380 destination_sync_mode=DestinationSyncMode.append, 381 ) 382 for stream in discovered_catalog.streams 383 if stream.name in streams_list 384 ] 385 ) 386 configured_catalog_path = temp_dir / "catalog.json" 387 configured_catalog_path.write_text( 388 orjson.dumps(asdict(configured_catalog)).decode("utf-8") 389 ) 390 read_result: EntrypointOutput = run_docker_airbyte_command( 391 [ 392 "docker", 393 "run", 394 "--rm", 395 "-v", 396 f"{temp_config_file}:{container_config_path}", 397 "-v", 398 f"{configured_catalog_path}:{container_catalog_path}", 399 connector_image, 400 "read", 401 "--config", 402 container_config_path, 403 "--catalog", 404 container_catalog_path, 405 ], 406 raise_if_errors=True, 407 )
41class DockerConnectorTestSuite: 42 """Base class for connector test suites.""" 43 44 @classmethod 45 def get_test_class_dir(cls) -> Path: 46 """Get the file path that contains the class.""" 47 module = sys.modules[cls.__module__] 48 # Get the directory containing the test file 49 return Path(inspect.getfile(module)).parent 50 51 @classmethod 52 def get_connector_root_dir(cls) -> Path: 53 """Get the root directory of the connector.""" 54 return find_connector_root([cls.get_test_class_dir(), Path.cwd()]) 55 56 @classproperty 57 def connector_name(self) -> str: 58 """Get the name of the connector.""" 59 connector_root = self.get_connector_root_dir() 60 return connector_root.absolute().name 61 62 @classmethod 63 def is_destination_connector(cls) -> bool: 64 """Check if the connector is a destination.""" 65 return cast(str, cls.connector_name).startswith("destination-") 66 67 @classproperty 68 def acceptance_test_config(cls) -> Any: 69 """Get the contents of acceptance test config file. 70 71 Also perform some basic validation that the file has the expected structure. 72 """ 73 acceptance_test_config_path = cls.get_connector_root_dir() / ACCEPTANCE_TEST_CONFIG 74 if not acceptance_test_config_path.exists(): 75 raise FileNotFoundError( 76 f"Acceptance test config file not found at: {str(acceptance_test_config_path)}" 77 ) 78 79 tests_config = yaml.safe_load(acceptance_test_config_path.read_text()) 80 81 if "acceptance_tests" not in tests_config: 82 raise ValueError( 83 f"Acceptance tests config not found in {acceptance_test_config_path}." 84 f" Found only: {str(tests_config)}." 85 ) 86 return tests_config 87 88 @staticmethod 89 def _dedup_scenarios(scenarios: list[ConnectorTestScenario]) -> list[ConnectorTestScenario]: 90 """ 91 For FAST tests, we treat each config as a separate test scenario to run against, whereas CATs defined 92 a series of more granular scenarios specifying a config_path and empty_streams among other things. 93 94 This method deduplicates the CATs scenarios based on their config_path. In doing so, we choose to 95 take the union of any defined empty_streams, to have high confidence that runnning a read with the 96 config will not error on the lack of data in the empty streams or lack of permissions to read them. 97 98 """ 99 deduped_scenarios: list[ConnectorTestScenario] = [] 100 101 for scenario in scenarios: 102 for existing_scenario in deduped_scenarios: 103 if scenario.config_path == existing_scenario.config_path: 104 # If a scenario with the same config_path already exists, we merge the empty streams. 105 # scenarios are immutable, so we create a new one. 106 all_empty_streams = (existing_scenario.empty_streams or []) + ( 107 scenario.empty_streams or [] 108 ) 109 merged_scenario = existing_scenario.model_copy( 110 update={"empty_streams": list(set(all_empty_streams))} 111 ) 112 deduped_scenarios.remove(existing_scenario) 113 deduped_scenarios.append(merged_scenario) 114 break 115 else: 116 # If a scenario does not exist with the config, add the new scenario to the list. 117 deduped_scenarios.append(scenario) 118 return deduped_scenarios 119 120 @classmethod 121 def get_scenarios( 122 cls, 123 ) -> list[ConnectorTestScenario]: 124 """Get acceptance tests for a given category. 125 126 This has to be a separate function because pytest does not allow 127 parametrization of fixtures with arguments from the test class itself. 128 """ 129 try: 130 all_tests_config = cls.acceptance_test_config 131 except FileNotFoundError as e: 132 # Destinations sometimes do not have an acceptance tests file. 133 warnings.warn( 134 f"Acceptance test config file not found: {e!s}. No scenarios will be loaded.", 135 category=UserWarning, 136 stacklevel=1, 137 ) 138 return [] 139 140 test_scenarios: list[ConnectorTestScenario] = [] 141 # we look in the basic_read section to find any empty streams 142 for category in ["spec", "connection", "basic_read"]: 143 if ( 144 category not in all_tests_config["acceptance_tests"] 145 or "tests" not in all_tests_config["acceptance_tests"][category] 146 ): 147 continue 148 149 for test in all_tests_config["acceptance_tests"][category]["tests"]: 150 if "config_path" not in test: 151 # Skip tests without a config_path 152 continue 153 154 if "iam_role" in test["config_path"]: 155 # We skip iam_role tests for now, as they are not supported in the test suite. 156 continue 157 158 scenario = ConnectorTestScenario.model_validate(test) 159 160 test_scenarios.append(scenario) 161 162 deduped_test_scenarios = cls._dedup_scenarios(test_scenarios) 163 164 return deduped_test_scenarios 165 166 @pytest.mark.skipif( 167 shutil.which("docker") is None, 168 reason="docker CLI not found in PATH, skipping docker image tests", 169 ) 170 @pytest.mark.image_tests 171 def test_docker_image_build_and_spec( 172 self, 173 connector_image_override: str | None, 174 ) -> None: 175 """Run `docker_image` acceptance tests.""" 176 connector_root = self.get_connector_root_dir().absolute() 177 metadata = MetadataFile.from_file(connector_root / "metadata.yaml") 178 179 connector_image: str | None = connector_image_override 180 if not connector_image: 181 tag = "dev-latest" 182 connector_image = build_connector_image( 183 connector_name=connector_root.absolute().name, 184 connector_directory=connector_root, 185 metadata=metadata, 186 tag=tag, 187 no_verify=False, 188 ) 189 190 _ = run_docker_airbyte_command( 191 [ 192 "docker", 193 "run", 194 "--rm", 195 connector_image, 196 "spec", 197 ], 198 raise_if_errors=True, 199 ) 200 201 @pytest.mark.skipif( 202 shutil.which("docker") is None, 203 reason="docker CLI not found in PATH, skipping docker image tests", 204 ) 205 @pytest.mark.image_tests 206 def test_docker_image_build_and_check( 207 self, 208 scenario: ConnectorTestScenario, 209 connector_image_override: str | None, 210 ) -> None: 211 """Run `docker_image` acceptance tests. 212 213 This test builds the connector image and runs the `check` command inside the container. 214 215 Note: 216 - It is expected for docker image caches to be reused between test runs. 217 - In the rare case that image caches need to be cleared, please clear 218 the local docker image cache using `docker image prune -a` command. 219 """ 220 if scenario.expected_outcome.expect_exception(): 221 pytest.skip("Skipping test_docker_image_build_and_check (expected to fail).") 222 223 tag = "dev-latest" 224 connector_root = self.get_connector_root_dir() 225 metadata = MetadataFile.from_file(connector_root / "metadata.yaml") 226 connector_image: str | None = connector_image_override 227 if not connector_image: 228 tag = "dev-latest" 229 connector_image = build_connector_image( 230 connector_name=connector_root.absolute().name, 231 connector_directory=connector_root, 232 metadata=metadata, 233 tag=tag, 234 no_verify=False, 235 ) 236 237 container_config_path = "/secrets/config.json" 238 with scenario.with_temp_config_file( 239 connector_root=connector_root, 240 ) as temp_config_file: 241 _ = run_docker_airbyte_command( 242 [ 243 "docker", 244 "run", 245 "--rm", 246 "-v", 247 f"{temp_config_file}:{container_config_path}", 248 connector_image, 249 "check", 250 "--config", 251 container_config_path, 252 ], 253 raise_if_errors=True, 254 ) 255 256 @pytest.mark.skipif( 257 shutil.which("docker") is None, 258 reason="docker CLI not found in PATH, skipping docker image tests", 259 ) 260 @pytest.mark.image_tests 261 def test_docker_image_build_and_read( 262 self, 263 scenario: ConnectorTestScenario, 264 connector_image_override: str | None, 265 read_from_streams: Literal["all", "none", "default"] | list[str], 266 read_scenarios: Literal["all", "none", "default"] | list[str], 267 ) -> None: 268 """Read from the connector's Docker image. 269 270 This test builds the connector image and runs the `read` command inside the container. 271 272 Note: 273 - It is expected for docker image caches to be reused between test runs. 274 - In the rare case that image caches need to be cleared, please clear 275 the local docker image cache using `docker image prune -a` command. 276 - If the --connector-image arg is provided, it will be used instead of building the image. 277 """ 278 if self.is_destination_connector(): 279 pytest.skip("Skipping read test for destination connector.") 280 281 if scenario.expected_outcome.expect_exception(): 282 pytest.skip("Skipping (expected to fail).") 283 284 if read_from_streams == "none": 285 pytest.skip("Skipping read test (`--read-from-streams=false`).") 286 287 if read_scenarios == "none": 288 pytest.skip("Skipping (`--read-scenarios=none`).") 289 290 default_scenario_ids = ["config", "valid_config", "default"] 291 if read_scenarios == "all": 292 pass 293 elif read_scenarios == "default": 294 if scenario.id not in default_scenario_ids: 295 pytest.skip( 296 f"Skipping read test for scenario '{scenario.id}' " 297 f"(not in default scenarios list '{default_scenario_ids}')." 298 ) 299 elif scenario.id not in read_scenarios: 300 # pytest.skip( 301 raise ValueError( 302 f"Skipping read test for scenario '{scenario.id}' " 303 f"(not in --read-scenarios={read_scenarios})." 304 ) 305 306 tag = "dev-latest" 307 connector_root = self.get_connector_root_dir() 308 connector_name = connector_root.absolute().name 309 metadata = MetadataFile.from_file(connector_root / "metadata.yaml") 310 connector_image: str | None = connector_image_override 311 if not connector_image: 312 tag = "dev-latest" 313 connector_image = build_connector_image( 314 connector_name=connector_name, 315 connector_directory=connector_root, 316 metadata=metadata, 317 tag=tag, 318 no_verify=False, 319 ) 320 321 container_config_path = "/secrets/config.json" 322 container_catalog_path = "/secrets/catalog.json" 323 324 with ( 325 scenario.with_temp_config_file( 326 connector_root=connector_root, 327 ) as temp_config_file, 328 tempfile.TemporaryDirectory( 329 prefix=f"{connector_name}-test", 330 ignore_cleanup_errors=True, 331 ) as temp_dir_str, 332 ): 333 temp_dir = Path(temp_dir_str) 334 discover_result = run_docker_airbyte_command( 335 [ 336 "docker", 337 "run", 338 "--rm", 339 "-v", 340 f"{temp_config_file}:{container_config_path}", 341 connector_image, 342 "discover", 343 "--config", 344 container_config_path, 345 ], 346 raise_if_errors=True, 347 ) 348 349 catalog_message = discover_result.catalog # Get catalog message 350 assert catalog_message.catalog is not None, "Catalog message missing catalog." 351 discovered_catalog: AirbyteCatalog = catalog_message.catalog 352 if not discovered_catalog.streams: 353 raise ValueError( 354 f"Discovered catalog for connector '{connector_name}' is empty. " 355 "Please check the connector's discover implementation." 356 ) 357 358 streams_list = [stream.name for stream in discovered_catalog.streams] 359 if read_from_streams == "default" and metadata.data.suggestedStreams: 360 # set `streams_list` to be the intersection of discovered and suggested streams. 361 streams_list = list(set(streams_list) & set(metadata.data.suggestedStreams.streams)) 362 363 if isinstance(read_from_streams, list): 364 # If `read_from_streams` is a list, we filter the discovered streams. 365 streams_list = list(set(streams_list) & set(read_from_streams)) 366 367 if scenario.empty_streams: 368 # Filter out streams marked as empty in the scenario. 369 empty_stream_names = [stream.name for stream in scenario.empty_streams] 370 streams_list = [s for s in streams_list if s.name not in empty_stream_names] 371 372 configured_catalog: ConfiguredAirbyteCatalog = ConfiguredAirbyteCatalog( 373 streams=[ 374 ConfiguredAirbyteStream( 375 stream=stream, 376 sync_mode=( 377 stream.supported_sync_modes[0] 378 if stream.supported_sync_modes 379 else SyncMode.full_refresh 380 ), 381 destination_sync_mode=DestinationSyncMode.append, 382 ) 383 for stream in discovered_catalog.streams 384 if stream.name in streams_list 385 ] 386 ) 387 configured_catalog_path = temp_dir / "catalog.json" 388 configured_catalog_path.write_text( 389 orjson.dumps(asdict(configured_catalog)).decode("utf-8") 390 ) 391 read_result: EntrypointOutput = run_docker_airbyte_command( 392 [ 393 "docker", 394 "run", 395 "--rm", 396 "-v", 397 f"{temp_config_file}:{container_config_path}", 398 "-v", 399 f"{configured_catalog_path}:{container_catalog_path}", 400 connector_image, 401 "read", 402 "--config", 403 container_config_path, 404 "--catalog", 405 container_catalog_path, 406 ], 407 raise_if_errors=True, 408 )
Base class for connector test suites.
44 @classmethod 45 def get_test_class_dir(cls) -> Path: 46 """Get the file path that contains the class.""" 47 module = sys.modules[cls.__module__] 48 # Get the directory containing the test file 49 return Path(inspect.getfile(module)).parent
Get the file path that contains the class.
51 @classmethod 52 def get_connector_root_dir(cls) -> Path: 53 """Get the root directory of the connector.""" 54 return find_connector_root([cls.get_test_class_dir(), Path.cwd()])
Get the root directory of the connector.
Much like a property
, but the wrapped get function is a
class method. For simplicity, only read-only properties are
implemented.
62 @classmethod 63 def is_destination_connector(cls) -> bool: 64 """Check if the connector is a destination.""" 65 return cast(str, cls.connector_name).startswith("destination-")
Check if the connector is a destination.
Much like a property
, but the wrapped get function is a
class method. For simplicity, only read-only properties are
implemented.
120 @classmethod 121 def get_scenarios( 122 cls, 123 ) -> list[ConnectorTestScenario]: 124 """Get acceptance tests for a given category. 125 126 This has to be a separate function because pytest does not allow 127 parametrization of fixtures with arguments from the test class itself. 128 """ 129 try: 130 all_tests_config = cls.acceptance_test_config 131 except FileNotFoundError as e: 132 # Destinations sometimes do not have an acceptance tests file. 133 warnings.warn( 134 f"Acceptance test config file not found: {e!s}. No scenarios will be loaded.", 135 category=UserWarning, 136 stacklevel=1, 137 ) 138 return [] 139 140 test_scenarios: list[ConnectorTestScenario] = [] 141 # we look in the basic_read section to find any empty streams 142 for category in ["spec", "connection", "basic_read"]: 143 if ( 144 category not in all_tests_config["acceptance_tests"] 145 or "tests" not in all_tests_config["acceptance_tests"][category] 146 ): 147 continue 148 149 for test in all_tests_config["acceptance_tests"][category]["tests"]: 150 if "config_path" not in test: 151 # Skip tests without a config_path 152 continue 153 154 if "iam_role" in test["config_path"]: 155 # We skip iam_role tests for now, as they are not supported in the test suite. 156 continue 157 158 scenario = ConnectorTestScenario.model_validate(test) 159 160 test_scenarios.append(scenario) 161 162 deduped_test_scenarios = cls._dedup_scenarios(test_scenarios) 163 164 return deduped_test_scenarios
Get acceptance tests for a given category.
This has to be a separate function because pytest does not allow parametrization of fixtures with arguments from the test class itself.
166 @pytest.mark.skipif( 167 shutil.which("docker") is None, 168 reason="docker CLI not found in PATH, skipping docker image tests", 169 ) 170 @pytest.mark.image_tests 171 def test_docker_image_build_and_spec( 172 self, 173 connector_image_override: str | None, 174 ) -> None: 175 """Run `docker_image` acceptance tests.""" 176 connector_root = self.get_connector_root_dir().absolute() 177 metadata = MetadataFile.from_file(connector_root / "metadata.yaml") 178 179 connector_image: str | None = connector_image_override 180 if not connector_image: 181 tag = "dev-latest" 182 connector_image = build_connector_image( 183 connector_name=connector_root.absolute().name, 184 connector_directory=connector_root, 185 metadata=metadata, 186 tag=tag, 187 no_verify=False, 188 ) 189 190 _ = run_docker_airbyte_command( 191 [ 192 "docker", 193 "run", 194 "--rm", 195 connector_image, 196 "spec", 197 ], 198 raise_if_errors=True, 199 )
Run docker_image
acceptance tests.
201 @pytest.mark.skipif( 202 shutil.which("docker") is None, 203 reason="docker CLI not found in PATH, skipping docker image tests", 204 ) 205 @pytest.mark.image_tests 206 def test_docker_image_build_and_check( 207 self, 208 scenario: ConnectorTestScenario, 209 connector_image_override: str | None, 210 ) -> None: 211 """Run `docker_image` acceptance tests. 212 213 This test builds the connector image and runs the `check` command inside the container. 214 215 Note: 216 - It is expected for docker image caches to be reused between test runs. 217 - In the rare case that image caches need to be cleared, please clear 218 the local docker image cache using `docker image prune -a` command. 219 """ 220 if scenario.expected_outcome.expect_exception(): 221 pytest.skip("Skipping test_docker_image_build_and_check (expected to fail).") 222 223 tag = "dev-latest" 224 connector_root = self.get_connector_root_dir() 225 metadata = MetadataFile.from_file(connector_root / "metadata.yaml") 226 connector_image: str | None = connector_image_override 227 if not connector_image: 228 tag = "dev-latest" 229 connector_image = build_connector_image( 230 connector_name=connector_root.absolute().name, 231 connector_directory=connector_root, 232 metadata=metadata, 233 tag=tag, 234 no_verify=False, 235 ) 236 237 container_config_path = "/secrets/config.json" 238 with scenario.with_temp_config_file( 239 connector_root=connector_root, 240 ) as temp_config_file: 241 _ = run_docker_airbyte_command( 242 [ 243 "docker", 244 "run", 245 "--rm", 246 "-v", 247 f"{temp_config_file}:{container_config_path}", 248 connector_image, 249 "check", 250 "--config", 251 container_config_path, 252 ], 253 raise_if_errors=True, 254 )
Run docker_image
acceptance tests.
This test builds the connector image and runs the check
command inside the container.
Note:
- It is expected for docker image caches to be reused between test runs.
- In the rare case that image caches need to be cleared, please clear the local docker image cache using
docker image prune -a
command.
256 @pytest.mark.skipif( 257 shutil.which("docker") is None, 258 reason="docker CLI not found in PATH, skipping docker image tests", 259 ) 260 @pytest.mark.image_tests 261 def test_docker_image_build_and_read( 262 self, 263 scenario: ConnectorTestScenario, 264 connector_image_override: str | None, 265 read_from_streams: Literal["all", "none", "default"] | list[str], 266 read_scenarios: Literal["all", "none", "default"] | list[str], 267 ) -> None: 268 """Read from the connector's Docker image. 269 270 This test builds the connector image and runs the `read` command inside the container. 271 272 Note: 273 - It is expected for docker image caches to be reused between test runs. 274 - In the rare case that image caches need to be cleared, please clear 275 the local docker image cache using `docker image prune -a` command. 276 - If the --connector-image arg is provided, it will be used instead of building the image. 277 """ 278 if self.is_destination_connector(): 279 pytest.skip("Skipping read test for destination connector.") 280 281 if scenario.expected_outcome.expect_exception(): 282 pytest.skip("Skipping (expected to fail).") 283 284 if read_from_streams == "none": 285 pytest.skip("Skipping read test (`--read-from-streams=false`).") 286 287 if read_scenarios == "none": 288 pytest.skip("Skipping (`--read-scenarios=none`).") 289 290 default_scenario_ids = ["config", "valid_config", "default"] 291 if read_scenarios == "all": 292 pass 293 elif read_scenarios == "default": 294 if scenario.id not in default_scenario_ids: 295 pytest.skip( 296 f"Skipping read test for scenario '{scenario.id}' " 297 f"(not in default scenarios list '{default_scenario_ids}')." 298 ) 299 elif scenario.id not in read_scenarios: 300 # pytest.skip( 301 raise ValueError( 302 f"Skipping read test for scenario '{scenario.id}' " 303 f"(not in --read-scenarios={read_scenarios})." 304 ) 305 306 tag = "dev-latest" 307 connector_root = self.get_connector_root_dir() 308 connector_name = connector_root.absolute().name 309 metadata = MetadataFile.from_file(connector_root / "metadata.yaml") 310 connector_image: str | None = connector_image_override 311 if not connector_image: 312 tag = "dev-latest" 313 connector_image = build_connector_image( 314 connector_name=connector_name, 315 connector_directory=connector_root, 316 metadata=metadata, 317 tag=tag, 318 no_verify=False, 319 ) 320 321 container_config_path = "/secrets/config.json" 322 container_catalog_path = "/secrets/catalog.json" 323 324 with ( 325 scenario.with_temp_config_file( 326 connector_root=connector_root, 327 ) as temp_config_file, 328 tempfile.TemporaryDirectory( 329 prefix=f"{connector_name}-test", 330 ignore_cleanup_errors=True, 331 ) as temp_dir_str, 332 ): 333 temp_dir = Path(temp_dir_str) 334 discover_result = run_docker_airbyte_command( 335 [ 336 "docker", 337 "run", 338 "--rm", 339 "-v", 340 f"{temp_config_file}:{container_config_path}", 341 connector_image, 342 "discover", 343 "--config", 344 container_config_path, 345 ], 346 raise_if_errors=True, 347 ) 348 349 catalog_message = discover_result.catalog # Get catalog message 350 assert catalog_message.catalog is not None, "Catalog message missing catalog." 351 discovered_catalog: AirbyteCatalog = catalog_message.catalog 352 if not discovered_catalog.streams: 353 raise ValueError( 354 f"Discovered catalog for connector '{connector_name}' is empty. " 355 "Please check the connector's discover implementation." 356 ) 357 358 streams_list = [stream.name for stream in discovered_catalog.streams] 359 if read_from_streams == "default" and metadata.data.suggestedStreams: 360 # set `streams_list` to be the intersection of discovered and suggested streams. 361 streams_list = list(set(streams_list) & set(metadata.data.suggestedStreams.streams)) 362 363 if isinstance(read_from_streams, list): 364 # If `read_from_streams` is a list, we filter the discovered streams. 365 streams_list = list(set(streams_list) & set(read_from_streams)) 366 367 if scenario.empty_streams: 368 # Filter out streams marked as empty in the scenario. 369 empty_stream_names = [stream.name for stream in scenario.empty_streams] 370 streams_list = [s for s in streams_list if s.name not in empty_stream_names] 371 372 configured_catalog: ConfiguredAirbyteCatalog = ConfiguredAirbyteCatalog( 373 streams=[ 374 ConfiguredAirbyteStream( 375 stream=stream, 376 sync_mode=( 377 stream.supported_sync_modes[0] 378 if stream.supported_sync_modes 379 else SyncMode.full_refresh 380 ), 381 destination_sync_mode=DestinationSyncMode.append, 382 ) 383 for stream in discovered_catalog.streams 384 if stream.name in streams_list 385 ] 386 ) 387 configured_catalog_path = temp_dir / "catalog.json" 388 configured_catalog_path.write_text( 389 orjson.dumps(asdict(configured_catalog)).decode("utf-8") 390 ) 391 read_result: EntrypointOutput = run_docker_airbyte_command( 392 [ 393 "docker", 394 "run", 395 "--rm", 396 "-v", 397 f"{temp_config_file}:{container_config_path}", 398 "-v", 399 f"{configured_catalog_path}:{container_catalog_path}", 400 connector_image, 401 "read", 402 "--config", 403 container_config_path, 404 "--catalog", 405 container_catalog_path, 406 ], 407 raise_if_errors=True, 408 )
Read from the connector's Docker image.
This test builds the connector image and runs the read
command inside the container.
Note:
- It is expected for docker image caches to be reused between test runs.
- In the rare case that image caches need to be cleared, please clear the local docker image cache using
docker image prune -a
command.- If the --connector-image arg is provided, it will be used instead of building the image.