airbyte_cdk.test.utils.data

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2
 3from pydantic import FilePath
 4
 5
 6def get_unit_test_folder(execution_folder: str) -> FilePath:
 7    path = FilePath(execution_folder)
 8    while path.name != "unit_tests":
 9        if path.name == path.root or path.name == path.drive:
10            raise ValueError(
11                f"Could not find `unit_tests` folder as a parent of {execution_folder}"
12            )
13        path = path.parent
14    return path
15
16
17def read_resource_file_contents(resource: str, test_location: str) -> str:
18    """Read the contents of a test data file from the test resource folder."""
19    file_path = str(
20        get_unit_test_folder(test_location) / "resource" / "http" / "response" / f"{resource}"
21    )
22    with open(file_path) as f:
23        response = f.read()
24    return response
def get_unit_test_folder( execution_folder: str) -> Annotated[pathlib.Path, PathType(path_type='file')]:
 7def get_unit_test_folder(execution_folder: str) -> FilePath:
 8    path = FilePath(execution_folder)
 9    while path.name != "unit_tests":
10        if path.name == path.root or path.name == path.drive:
11            raise ValueError(
12                f"Could not find `unit_tests` folder as a parent of {execution_folder}"
13            )
14        path = path.parent
15    return path
def read_resource_file_contents(resource: str, test_location: str) -> str:
18def read_resource_file_contents(resource: str, test_location: str) -> str:
19    """Read the contents of a test data file from the test resource folder."""
20    file_path = str(
21        get_unit_test_folder(test_location) / "resource" / "http" / "response" / f"{resource}"
22    )
23    with open(file_path) as f:
24        response = f.read()
25    return response

Read the contents of a test data file from the test resource folder.