airbyte_cdk.sources.utils.schema_helpers
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5 6import importlib 7import json 8import os 9import pkgutil 10from copy import deepcopy 11from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Mapping, MutableMapping, Tuple, cast 12 13import jsonref 14from jsonschema import validate 15from jsonschema.exceptions import ValidationError 16from pydantic.v1 import BaseModel, Field 17from referencing import Registry, Resource 18from referencing._core import Resolver # used for type hints 19from referencing.jsonschema import DRAFT7 20 21from airbyte_cdk.models import ConnectorSpecification, FailureType 22from airbyte_cdk.utils.traced_exception import AirbyteTracedException 23 24 25class JsonFileLoader: 26 """ 27 Custom json file loader to resolve references to resources located in "shared" directory. 28 We need this for compatability with existing schemas cause all of them have references 29 pointing to shared_schema.json file instead of shared/shared_schema.json 30 """ 31 32 def __init__(self, uri_base: str, shared: str): 33 self.shared = shared 34 self.uri_base = uri_base 35 36 def __call__(self, uri: str) -> Dict[str, Any]: 37 uri = uri.replace(self.uri_base, f"{self.uri_base}/{self.shared}/") 38 with open(uri) as f: 39 data = json.load(f) 40 if isinstance(data, dict): 41 return data 42 else: 43 raise ValueError(f"Expected to read a dictionary from {uri}. Got: {data}") 44 45 46def resolve_ref_links(obj: Any) -> Any: 47 """ 48 Scan resolved schema and convert jsonref.JsonRef object to JSON serializable dict. 49 50 :param obj - jsonschema object with ref field resolved. 51 :return JSON serializable object with references without external dependencies. 52 """ 53 if isinstance(obj, jsonref.JsonRef): 54 obj = resolve_ref_links(obj.__subject__) 55 # Omit existing definitions for external resource since 56 # we dont need it anymore. 57 if isinstance(obj, dict): 58 obj.pop("definitions", None) 59 return obj 60 else: 61 raise ValueError(f"Expected obj to be a dict. Got {obj}") 62 elif isinstance(obj, dict): 63 return {k: resolve_ref_links(v) for k, v in obj.items()} 64 elif isinstance(obj, list): 65 return [resolve_ref_links(item) for item in obj] 66 else: 67 return obj 68 69 70def get_ref_resolver_registry(schema: dict[str, Any]) -> Registry: 71 """Get a reference resolver registry for the given schema.""" 72 resource: Resource = Resource.from_contents( 73 contents=schema, 74 default_specification=DRAFT7, 75 ) 76 return cast( # Mypy has a hard time detecting this return type. 77 "Registry", 78 Registry().with_resource( 79 uri="", 80 resource=resource, 81 ), 82 ) 83 84 85def _expand_refs(schema: Any, ref_resolver: Resolver) -> None: 86 """Internal function to iterate over schema and replace all occurrences of $ref with their definitions. Recursive. 87 88 :param schema: schema that will be patched 89 """ 90 if isinstance(schema, MutableMapping): 91 if "$ref" in schema: 92 ref_url = schema.pop("$ref") 93 definition = ref_resolver.lookup(ref_url).contents 94 _expand_refs( 95 definition, ref_resolver=ref_resolver 96 ) # expand refs in definitions as well 97 schema.update(definition) 98 else: 99 for key, value in schema.items(): 100 _expand_refs(value, ref_resolver=ref_resolver) 101 elif isinstance(schema, List): 102 for value in schema: 103 _expand_refs(value, ref_resolver=ref_resolver) 104 105 106def expand_refs(schema: Any) -> None: 107 """Iterate over schema and replace all occurrences of $ref with their definitions. 108 109 If a "definitions" section is present at the root of the schema, it will be removed 110 after $ref resolution is complete. 111 112 :param schema: schema that will be patched 113 """ 114 ref_resolver = get_ref_resolver_registry(schema).resolver() 115 _expand_refs(schema, ref_resolver) 116 schema.pop("definitions", None) 117 118 119def rename_key(schema: Any, old_key: str, new_key: str) -> None: 120 """Iterate over nested dictionary and replace one key with another. Used to replace anyOf with oneOf. Recursive." 121 122 :param schema: schema that will be patched 123 :param old_key: name of the key to replace 124 :param new_key: new name of the key 125 """ 126 if not isinstance(schema, MutableMapping): 127 return 128 129 for key, value in schema.items(): 130 rename_key(value, old_key, new_key) 131 if old_key in schema: 132 schema[new_key] = schema.pop(old_key) 133 134 135class ResourceSchemaLoader: 136 """JSONSchema loader from package resources""" 137 138 def __init__(self, package_name: str): 139 self.package_name = package_name 140 141 def get_schema(self, name: str) -> dict[str, Any]: 142 """ 143 This method retrieves a JSON schema from the schemas/ folder. 144 145 146 The expected file structure is to have all top-level schemas (corresponding to streams) in the "schemas/" folder, with any shared $refs 147 living inside the "schemas/shared/" folder. For example: 148 149 schemas/shared/<shared_definition>.json 150 schemas/<name>.json # contains a $ref to shared_definition 151 schemas/<name2>.json # contains a $ref to shared_definition 152 """ 153 154 schema_filename = f"schemas/{name}.json" 155 raw_file = pkgutil.get_data(self.package_name, schema_filename) 156 if not raw_file: 157 raise IOError(f"Cannot find file {schema_filename}") 158 try: 159 raw_schema = json.loads(raw_file) 160 except ValueError as err: 161 raise RuntimeError(f"Invalid JSON file format for file {schema_filename}") from err 162 163 return self._resolve_schema_references(raw_schema) 164 165 def _resolve_schema_references(self, raw_schema: dict[str, Any]) -> dict[str, Any]: 166 """ 167 Resolve links to external references and move it to local "definitions" map. 168 169 :param raw_schema jsonschema to lookup for external links. 170 :return JSON serializable object with references without external dependencies. 171 """ 172 173 package = importlib.import_module(self.package_name) 174 if package.__file__: 175 base = os.path.dirname(package.__file__) + "/" 176 else: 177 raise ValueError(f"Package {package} does not have a valid __file__ field") 178 resolved = jsonref.JsonRef.replace_refs( 179 raw_schema, loader=JsonFileLoader(base, "schemas/shared"), base_uri=base 180 ) 181 resolved = resolve_ref_links(resolved) 182 if isinstance(resolved, dict): 183 return resolved 184 else: 185 raise ValueError(f"Expected resolved to be a dict. Got {resolved}") 186 187 188def check_config_against_spec_or_exit( 189 config: Mapping[str, Any], spec: ConnectorSpecification 190) -> None: 191 """ 192 Check config object against spec. In case of spec is invalid, throws 193 an exception with validation error description. 194 195 :param config - config loaded from file specified over command line 196 :param spec - spec object generated by connector 197 """ 198 spec_schema = spec.connectionSpecification 199 try: 200 validate(instance=config, schema=spec_schema) 201 except ValidationError as validation_error: 202 raise AirbyteTracedException( 203 message="Config validation error: " + validation_error.message, 204 internal_message=validation_error.message, 205 failure_type=FailureType.config_error, 206 ) from None # required to prevent logging config secrets from the ValidationError's stacktrace 207 208 209class InternalConfig(BaseModel): 210 KEYWORDS: ClassVar[set[str]] = {"_limit", "_page_size"} 211 limit: int = Field(None, alias="_limit") 212 page_size: int = Field(None, alias="_page_size") 213 214 def dict(self, *args: Any, **kwargs: Any) -> dict[str, Any]: 215 kwargs["by_alias"] = True 216 kwargs["exclude_unset"] = True 217 return super().dict(*args, **kwargs) 218 219 def is_limit_reached(self, records_counter: int) -> bool: 220 """ 221 Check if record count reached limit set by internal config. 222 :param records_counter - number of records already red 223 :return True if limit reached, False otherwise 224 """ 225 if self.limit: 226 if records_counter >= self.limit: 227 return True 228 return False 229 230 231def split_config(config: Mapping[str, Any]) -> Tuple[dict[str, Any], InternalConfig]: 232 """ 233 Break config map object into 2 instances: first is a dict with user defined 234 configuration and second is internal config that contains private keys for 235 acceptance test configuration. 236 237 :param 238 config - Dict object that has been loaded from config file. 239 240 :return tuple of user defined config dict with filtered out internal 241 parameters and connector acceptance test internal config object. 242 """ 243 main_config = {} 244 internal_config = {} 245 for k, v in config.items(): 246 if k in InternalConfig.KEYWORDS: 247 internal_config[k] = v 248 else: 249 main_config[k] = v 250 return main_config, InternalConfig.parse_obj(internal_config)
26class JsonFileLoader: 27 """ 28 Custom json file loader to resolve references to resources located in "shared" directory. 29 We need this for compatability with existing schemas cause all of them have references 30 pointing to shared_schema.json file instead of shared/shared_schema.json 31 """ 32 33 def __init__(self, uri_base: str, shared: str): 34 self.shared = shared 35 self.uri_base = uri_base 36 37 def __call__(self, uri: str) -> Dict[str, Any]: 38 uri = uri.replace(self.uri_base, f"{self.uri_base}/{self.shared}/") 39 with open(uri) as f: 40 data = json.load(f) 41 if isinstance(data, dict): 42 return data 43 else: 44 raise ValueError(f"Expected to read a dictionary from {uri}. Got: {data}")
Custom json file loader to resolve references to resources located in "shared" directory. We need this for compatability with existing schemas cause all of them have references pointing to shared_schema.json file instead of shared/shared_schema.json
47def resolve_ref_links(obj: Any) -> Any: 48 """ 49 Scan resolved schema and convert jsonref.JsonRef object to JSON serializable dict. 50 51 :param obj - jsonschema object with ref field resolved. 52 :return JSON serializable object with references without external dependencies. 53 """ 54 if isinstance(obj, jsonref.JsonRef): 55 obj = resolve_ref_links(obj.__subject__) 56 # Omit existing definitions for external resource since 57 # we dont need it anymore. 58 if isinstance(obj, dict): 59 obj.pop("definitions", None) 60 return obj 61 else: 62 raise ValueError(f"Expected obj to be a dict. Got {obj}") 63 elif isinstance(obj, dict): 64 return {k: resolve_ref_links(v) for k, v in obj.items()} 65 elif isinstance(obj, list): 66 return [resolve_ref_links(item) for item in obj] 67 else: 68 return obj
Scan resolved schema and convert jsonref.JsonRef object to JSON serializable dict.
:param obj - jsonschema object with ref field resolved. :return JSON serializable object with references without external dependencies.
71def get_ref_resolver_registry(schema: dict[str, Any]) -> Registry: 72 """Get a reference resolver registry for the given schema.""" 73 resource: Resource = Resource.from_contents( 74 contents=schema, 75 default_specification=DRAFT7, 76 ) 77 return cast( # Mypy has a hard time detecting this return type. 78 "Registry", 79 Registry().with_resource( 80 uri="", 81 resource=resource, 82 ), 83 )
Get a reference resolver registry for the given schema.
107def expand_refs(schema: Any) -> None: 108 """Iterate over schema and replace all occurrences of $ref with their definitions. 109 110 If a "definitions" section is present at the root of the schema, it will be removed 111 after $ref resolution is complete. 112 113 :param schema: schema that will be patched 114 """ 115 ref_resolver = get_ref_resolver_registry(schema).resolver() 116 _expand_refs(schema, ref_resolver) 117 schema.pop("definitions", None)
Iterate over schema and replace all occurrences of $ref with their definitions.
If a "definitions" section is present at the root of the schema, it will be removed after $ref resolution is complete.
Parameters
- schema: schema that will be patched
120def rename_key(schema: Any, old_key: str, new_key: str) -> None: 121 """Iterate over nested dictionary and replace one key with another. Used to replace anyOf with oneOf. Recursive." 122 123 :param schema: schema that will be patched 124 :param old_key: name of the key to replace 125 :param new_key: new name of the key 126 """ 127 if not isinstance(schema, MutableMapping): 128 return 129 130 for key, value in schema.items(): 131 rename_key(value, old_key, new_key) 132 if old_key in schema: 133 schema[new_key] = schema.pop(old_key)
Iterate over nested dictionary and replace one key with another. Used to replace anyOf with oneOf. Recursive."
Parameters
- schema: schema that will be patched
- old_key: name of the key to replace
- new_key: new name of the key
136class ResourceSchemaLoader: 137 """JSONSchema loader from package resources""" 138 139 def __init__(self, package_name: str): 140 self.package_name = package_name 141 142 def get_schema(self, name: str) -> dict[str, Any]: 143 """ 144 This method retrieves a JSON schema from the schemas/ folder. 145 146 147 The expected file structure is to have all top-level schemas (corresponding to streams) in the "schemas/" folder, with any shared $refs 148 living inside the "schemas/shared/" folder. For example: 149 150 schemas/shared/<shared_definition>.json 151 schemas/<name>.json # contains a $ref to shared_definition 152 schemas/<name2>.json # contains a $ref to shared_definition 153 """ 154 155 schema_filename = f"schemas/{name}.json" 156 raw_file = pkgutil.get_data(self.package_name, schema_filename) 157 if not raw_file: 158 raise IOError(f"Cannot find file {schema_filename}") 159 try: 160 raw_schema = json.loads(raw_file) 161 except ValueError as err: 162 raise RuntimeError(f"Invalid JSON file format for file {schema_filename}") from err 163 164 return self._resolve_schema_references(raw_schema) 165 166 def _resolve_schema_references(self, raw_schema: dict[str, Any]) -> dict[str, Any]: 167 """ 168 Resolve links to external references and move it to local "definitions" map. 169 170 :param raw_schema jsonschema to lookup for external links. 171 :return JSON serializable object with references without external dependencies. 172 """ 173 174 package = importlib.import_module(self.package_name) 175 if package.__file__: 176 base = os.path.dirname(package.__file__) + "/" 177 else: 178 raise ValueError(f"Package {package} does not have a valid __file__ field") 179 resolved = jsonref.JsonRef.replace_refs( 180 raw_schema, loader=JsonFileLoader(base, "schemas/shared"), base_uri=base 181 ) 182 resolved = resolve_ref_links(resolved) 183 if isinstance(resolved, dict): 184 return resolved 185 else: 186 raise ValueError(f"Expected resolved to be a dict. Got {resolved}")
JSONSchema loader from package resources
142 def get_schema(self, name: str) -> dict[str, Any]: 143 """ 144 This method retrieves a JSON schema from the schemas/ folder. 145 146 147 The expected file structure is to have all top-level schemas (corresponding to streams) in the "schemas/" folder, with any shared $refs 148 living inside the "schemas/shared/" folder. For example: 149 150 schemas/shared/<shared_definition>.json 151 schemas/<name>.json # contains a $ref to shared_definition 152 schemas/<name2>.json # contains a $ref to shared_definition 153 """ 154 155 schema_filename = f"schemas/{name}.json" 156 raw_file = pkgutil.get_data(self.package_name, schema_filename) 157 if not raw_file: 158 raise IOError(f"Cannot find file {schema_filename}") 159 try: 160 raw_schema = json.loads(raw_file) 161 except ValueError as err: 162 raise RuntimeError(f"Invalid JSON file format for file {schema_filename}") from err 163 164 return self._resolve_schema_references(raw_schema)
This method retrieves a JSON schema from the schemas/ folder.
The expected file structure is to have all top-level schemas (corresponding to streams) in the "schemas/" folder, with any shared $refs living inside the "schemas/shared/" folder. For example:
schemas/shared/
189def check_config_against_spec_or_exit( 190 config: Mapping[str, Any], spec: ConnectorSpecification 191) -> None: 192 """ 193 Check config object against spec. In case of spec is invalid, throws 194 an exception with validation error description. 195 196 :param config - config loaded from file specified over command line 197 :param spec - spec object generated by connector 198 """ 199 spec_schema = spec.connectionSpecification 200 try: 201 validate(instance=config, schema=spec_schema) 202 except ValidationError as validation_error: 203 raise AirbyteTracedException( 204 message="Config validation error: " + validation_error.message, 205 internal_message=validation_error.message, 206 failure_type=FailureType.config_error, 207 ) from None # required to prevent logging config secrets from the ValidationError's stacktrace
Check config object against spec. In case of spec is invalid, throws an exception with validation error description.
:param config - config loaded from file specified over command line :param spec - spec object generated by connector
210class InternalConfig(BaseModel): 211 KEYWORDS: ClassVar[set[str]] = {"_limit", "_page_size"} 212 limit: int = Field(None, alias="_limit") 213 page_size: int = Field(None, alias="_page_size") 214 215 def dict(self, *args: Any, **kwargs: Any) -> dict[str, Any]: 216 kwargs["by_alias"] = True 217 kwargs["exclude_unset"] = True 218 return super().dict(*args, **kwargs) 219 220 def is_limit_reached(self, records_counter: int) -> bool: 221 """ 222 Check if record count reached limit set by internal config. 223 :param records_counter - number of records already red 224 :return True if limit reached, False otherwise 225 """ 226 if self.limit: 227 if records_counter >= self.limit: 228 return True 229 return False
215 def dict(self, *args: Any, **kwargs: Any) -> dict[str, Any]: 216 kwargs["by_alias"] = True 217 kwargs["exclude_unset"] = True 218 return super().dict(*args, **kwargs)
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
220 def is_limit_reached(self, records_counter: int) -> bool: 221 """ 222 Check if record count reached limit set by internal config. 223 :param records_counter - number of records already red 224 :return True if limit reached, False otherwise 225 """ 226 if self.limit: 227 if records_counter >= self.limit: 228 return True 229 return False
Check if record count reached limit set by internal config. :param records_counter - number of records already red :return True if limit reached, False otherwise
232def split_config(config: Mapping[str, Any]) -> Tuple[dict[str, Any], InternalConfig]: 233 """ 234 Break config map object into 2 instances: first is a dict with user defined 235 configuration and second is internal config that contains private keys for 236 acceptance test configuration. 237 238 :param 239 config - Dict object that has been loaded from config file. 240 241 :return tuple of user defined config dict with filtered out internal 242 parameters and connector acceptance test internal config object. 243 """ 244 main_config = {} 245 internal_config = {} 246 for k, v in config.items(): 247 if k in InternalConfig.KEYWORDS: 248 internal_config[k] = v 249 else: 250 main_config[k] = v 251 return main_config, InternalConfig.parse_obj(internal_config)
Break config map object into 2 instances: first is a dict with user defined configuration and second is internal config that contains private keys for acceptance test configuration.
:param config - Dict object that has been loaded from config file.
:return tuple of user defined config dict with filtered out internal parameters and connector acceptance test internal config object.