airbyte_cdk.test.utils.data
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2 3from pydantic import FilePath 4 5 6def get_unit_test_folder(execution_folder: str) -> FilePath: 7 path = FilePath(execution_folder) 8 while path.name != "unit_tests": 9 if path.name == path.root or path.name == path.drive: 10 raise ValueError( 11 f"Could not find `unit_tests` folder as a parent of {execution_folder}" 12 ) 13 path = path.parent 14 return path 15 16 17def read_resource_file_contents(resource: str, test_location: str) -> str: 18 """Read the contents of a test data file from the test resource folder.""" 19 file_path = str( 20 get_unit_test_folder(test_location) / "resource" / "http" / "response" / f"{resource}" 21 ) 22 with open(file_path) as f: 23 response = f.read() 24 return response
def
get_unit_test_folder( execution_folder: str) -> Annotated[pathlib.Path, PathType(path_type='file')]:
7def get_unit_test_folder(execution_folder: str) -> FilePath: 8 path = FilePath(execution_folder) 9 while path.name != "unit_tests": 10 if path.name == path.root or path.name == path.drive: 11 raise ValueError( 12 f"Could not find `unit_tests` folder as a parent of {execution_folder}" 13 ) 14 path = path.parent 15 return path
def
read_resource_file_contents(resource: str, test_location: str) -> str:
18def read_resource_file_contents(resource: str, test_location: str) -> str: 19 """Read the contents of a test data file from the test resource folder.""" 20 file_path = str( 21 get_unit_test_folder(test_location) / "resource" / "http" / "response" / f"{resource}" 22 ) 23 with open(file_path) as f: 24 response = f.read() 25 return response
Read the contents of a test data file from the test resource folder.