airbyte_cdk.sources.utils.schema_helpers
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5 6import importlib 7import json 8import os 9import pkgutil 10from typing import Any, ClassVar, Dict, List, Mapping, MutableMapping, Optional, Tuple 11 12import jsonref 13from jsonschema import RefResolver, validate 14from jsonschema.exceptions import ValidationError 15from pydantic.v1 import BaseModel, Field 16 17from airbyte_cdk.models import ConnectorSpecification, FailureType 18from airbyte_cdk.utils.traced_exception import AirbyteTracedException 19 20 21class JsonFileLoader: 22 """ 23 Custom json file loader to resolve references to resources located in "shared" directory. 24 We need this for compatability with existing schemas cause all of them have references 25 pointing to shared_schema.json file instead of shared/shared_schema.json 26 """ 27 28 def __init__(self, uri_base: str, shared: str): 29 self.shared = shared 30 self.uri_base = uri_base 31 32 def __call__(self, uri: str) -> Dict[str, Any]: 33 uri = uri.replace(self.uri_base, f"{self.uri_base}/{self.shared}/") 34 with open(uri) as f: 35 data = json.load(f) 36 if isinstance(data, dict): 37 return data 38 else: 39 raise ValueError(f"Expected to read a dictionary from {uri}. Got: {data}") 40 41 42def resolve_ref_links(obj: Any) -> Any: 43 """ 44 Scan resolved schema and convert jsonref.JsonRef object to JSON serializable dict. 45 46 :param obj - jsonschema object with ref field resolved. 47 :return JSON serializable object with references without external dependencies. 48 """ 49 if isinstance(obj, jsonref.JsonRef): 50 obj = resolve_ref_links(obj.__subject__) 51 # Omit existing definitions for external resource since 52 # we dont need it anymore. 53 if isinstance(obj, dict): 54 obj.pop("definitions", None) 55 return obj 56 else: 57 raise ValueError(f"Expected obj to be a dict. Got {obj}") 58 elif isinstance(obj, dict): 59 return {k: resolve_ref_links(v) for k, v in obj.items()} 60 elif isinstance(obj, list): 61 return [resolve_ref_links(item) for item in obj] 62 else: 63 return obj 64 65 66def _expand_refs(schema: Any, ref_resolver: Optional[RefResolver] = None) -> None: 67 """Internal function to iterate over schema and replace all occurrences of $ref with their definitions. Recursive. 68 69 :param schema: schema that will be patched 70 :param ref_resolver: resolver to get definition from $ref, if None pass it will be instantiated 71 """ 72 ref_resolver = ref_resolver or RefResolver.from_schema(schema) 73 74 if isinstance(schema, MutableMapping): 75 if "$ref" in schema: 76 ref_url = schema.pop("$ref") 77 _, definition = ref_resolver.resolve(ref_url) 78 _expand_refs( 79 definition, ref_resolver=ref_resolver 80 ) # expand refs in definitions as well 81 schema.update(definition) 82 else: 83 for key, value in schema.items(): 84 _expand_refs(value, ref_resolver=ref_resolver) 85 elif isinstance(schema, List): 86 for value in schema: 87 _expand_refs(value, ref_resolver=ref_resolver) 88 89 90def expand_refs(schema: Any) -> None: 91 """Iterate over schema and replace all occurrences of $ref with their definitions. 92 93 :param schema: schema that will be patched 94 """ 95 _expand_refs(schema) 96 schema.pop("definitions", None) # remove definitions created by $ref 97 98 99def rename_key(schema: Any, old_key: str, new_key: str) -> None: 100 """Iterate over nested dictionary and replace one key with another. Used to replace anyOf with oneOf. Recursive." 101 102 :param schema: schema that will be patched 103 :param old_key: name of the key to replace 104 :param new_key: new name of the key 105 """ 106 if not isinstance(schema, MutableMapping): 107 return 108 109 for key, value in schema.items(): 110 rename_key(value, old_key, new_key) 111 if old_key in schema: 112 schema[new_key] = schema.pop(old_key) 113 114 115class ResourceSchemaLoader: 116 """JSONSchema loader from package resources""" 117 118 def __init__(self, package_name: str): 119 self.package_name = package_name 120 121 def get_schema(self, name: str) -> dict[str, Any]: 122 """ 123 This method retrieves a JSON schema from the schemas/ folder. 124 125 126 The expected file structure is to have all top-level schemas (corresponding to streams) in the "schemas/" folder, with any shared $refs 127 living inside the "schemas/shared/" folder. For example: 128 129 schemas/shared/<shared_definition>.json 130 schemas/<name>.json # contains a $ref to shared_definition 131 schemas/<name2>.json # contains a $ref to shared_definition 132 """ 133 134 schema_filename = f"schemas/{name}.json" 135 raw_file = pkgutil.get_data(self.package_name, schema_filename) 136 if not raw_file: 137 raise IOError(f"Cannot find file {schema_filename}") 138 try: 139 raw_schema = json.loads(raw_file) 140 except ValueError as err: 141 raise RuntimeError(f"Invalid JSON file format for file {schema_filename}") from err 142 143 return self._resolve_schema_references(raw_schema) 144 145 def _resolve_schema_references(self, raw_schema: dict[str, Any]) -> dict[str, Any]: 146 """ 147 Resolve links to external references and move it to local "definitions" map. 148 149 :param raw_schema jsonschema to lookup for external links. 150 :return JSON serializable object with references without external dependencies. 151 """ 152 153 package = importlib.import_module(self.package_name) 154 if package.__file__: 155 base = os.path.dirname(package.__file__) + "/" 156 else: 157 raise ValueError(f"Package {package} does not have a valid __file__ field") 158 resolved = jsonref.JsonRef.replace_refs( 159 raw_schema, loader=JsonFileLoader(base, "schemas/shared"), base_uri=base 160 ) 161 resolved = resolve_ref_links(resolved) 162 if isinstance(resolved, dict): 163 return resolved 164 else: 165 raise ValueError(f"Expected resolved to be a dict. Got {resolved}") 166 167 168def check_config_against_spec_or_exit( 169 config: Mapping[str, Any], spec: ConnectorSpecification 170) -> None: 171 """ 172 Check config object against spec. In case of spec is invalid, throws 173 an exception with validation error description. 174 175 :param config - config loaded from file specified over command line 176 :param spec - spec object generated by connector 177 """ 178 spec_schema = spec.connectionSpecification 179 try: 180 validate(instance=config, schema=spec_schema) 181 except ValidationError as validation_error: 182 raise AirbyteTracedException( 183 message="Config validation error: " + validation_error.message, 184 internal_message=validation_error.message, 185 failure_type=FailureType.config_error, 186 ) from None # required to prevent logging config secrets from the ValidationError's stacktrace 187 188 189class InternalConfig(BaseModel): 190 KEYWORDS: ClassVar[set[str]] = {"_limit", "_page_size"} 191 limit: int = Field(None, alias="_limit") 192 page_size: int = Field(None, alias="_page_size") 193 194 def dict(self, *args: Any, **kwargs: Any) -> dict[str, Any]: 195 kwargs["by_alias"] = True 196 kwargs["exclude_unset"] = True 197 return super().dict(*args, **kwargs) 198 199 def is_limit_reached(self, records_counter: int) -> bool: 200 """ 201 Check if record count reached limit set by internal config. 202 :param records_counter - number of records already red 203 :return True if limit reached, False otherwise 204 """ 205 if self.limit: 206 if records_counter >= self.limit: 207 return True 208 return False 209 210 211def split_config(config: Mapping[str, Any]) -> Tuple[dict[str, Any], InternalConfig]: 212 """ 213 Break config map object into 2 instances: first is a dict with user defined 214 configuration and second is internal config that contains private keys for 215 acceptance test configuration. 216 217 :param 218 config - Dict object that has been loaded from config file. 219 220 :return tuple of user defined config dict with filtered out internal 221 parameters and connector acceptance test internal config object. 222 """ 223 main_config = {} 224 internal_config = {} 225 for k, v in config.items(): 226 if k in InternalConfig.KEYWORDS: 227 internal_config[k] = v 228 else: 229 main_config[k] = v 230 return main_config, InternalConfig.parse_obj(internal_config)
22class JsonFileLoader: 23 """ 24 Custom json file loader to resolve references to resources located in "shared" directory. 25 We need this for compatability with existing schemas cause all of them have references 26 pointing to shared_schema.json file instead of shared/shared_schema.json 27 """ 28 29 def __init__(self, uri_base: str, shared: str): 30 self.shared = shared 31 self.uri_base = uri_base 32 33 def __call__(self, uri: str) -> Dict[str, Any]: 34 uri = uri.replace(self.uri_base, f"{self.uri_base}/{self.shared}/") 35 with open(uri) as f: 36 data = json.load(f) 37 if isinstance(data, dict): 38 return data 39 else: 40 raise ValueError(f"Expected to read a dictionary from {uri}. Got: {data}")
Custom json file loader to resolve references to resources located in "shared" directory. We need this for compatability with existing schemas cause all of them have references pointing to shared_schema.json file instead of shared/shared_schema.json
43def resolve_ref_links(obj: Any) -> Any: 44 """ 45 Scan resolved schema and convert jsonref.JsonRef object to JSON serializable dict. 46 47 :param obj - jsonschema object with ref field resolved. 48 :return JSON serializable object with references without external dependencies. 49 """ 50 if isinstance(obj, jsonref.JsonRef): 51 obj = resolve_ref_links(obj.__subject__) 52 # Omit existing definitions for external resource since 53 # we dont need it anymore. 54 if isinstance(obj, dict): 55 obj.pop("definitions", None) 56 return obj 57 else: 58 raise ValueError(f"Expected obj to be a dict. Got {obj}") 59 elif isinstance(obj, dict): 60 return {k: resolve_ref_links(v) for k, v in obj.items()} 61 elif isinstance(obj, list): 62 return [resolve_ref_links(item) for item in obj] 63 else: 64 return obj
Scan resolved schema and convert jsonref.JsonRef object to JSON serializable dict.
:param obj - jsonschema object with ref field resolved. :return JSON serializable object with references without external dependencies.
91def expand_refs(schema: Any) -> None: 92 """Iterate over schema and replace all occurrences of $ref with their definitions. 93 94 :param schema: schema that will be patched 95 """ 96 _expand_refs(schema) 97 schema.pop("definitions", None) # remove definitions created by $ref
Iterate over schema and replace all occurrences of $ref with their definitions.
Parameters
- schema: schema that will be patched
100def rename_key(schema: Any, old_key: str, new_key: str) -> None: 101 """Iterate over nested dictionary and replace one key with another. Used to replace anyOf with oneOf. Recursive." 102 103 :param schema: schema that will be patched 104 :param old_key: name of the key to replace 105 :param new_key: new name of the key 106 """ 107 if not isinstance(schema, MutableMapping): 108 return 109 110 for key, value in schema.items(): 111 rename_key(value, old_key, new_key) 112 if old_key in schema: 113 schema[new_key] = schema.pop(old_key)
Iterate over nested dictionary and replace one key with another. Used to replace anyOf with oneOf. Recursive."
Parameters
- schema: schema that will be patched
- old_key: name of the key to replace
- new_key: new name of the key
116class ResourceSchemaLoader: 117 """JSONSchema loader from package resources""" 118 119 def __init__(self, package_name: str): 120 self.package_name = package_name 121 122 def get_schema(self, name: str) -> dict[str, Any]: 123 """ 124 This method retrieves a JSON schema from the schemas/ folder. 125 126 127 The expected file structure is to have all top-level schemas (corresponding to streams) in the "schemas/" folder, with any shared $refs 128 living inside the "schemas/shared/" folder. For example: 129 130 schemas/shared/<shared_definition>.json 131 schemas/<name>.json # contains a $ref to shared_definition 132 schemas/<name2>.json # contains a $ref to shared_definition 133 """ 134 135 schema_filename = f"schemas/{name}.json" 136 raw_file = pkgutil.get_data(self.package_name, schema_filename) 137 if not raw_file: 138 raise IOError(f"Cannot find file {schema_filename}") 139 try: 140 raw_schema = json.loads(raw_file) 141 except ValueError as err: 142 raise RuntimeError(f"Invalid JSON file format for file {schema_filename}") from err 143 144 return self._resolve_schema_references(raw_schema) 145 146 def _resolve_schema_references(self, raw_schema: dict[str, Any]) -> dict[str, Any]: 147 """ 148 Resolve links to external references and move it to local "definitions" map. 149 150 :param raw_schema jsonschema to lookup for external links. 151 :return JSON serializable object with references without external dependencies. 152 """ 153 154 package = importlib.import_module(self.package_name) 155 if package.__file__: 156 base = os.path.dirname(package.__file__) + "/" 157 else: 158 raise ValueError(f"Package {package} does not have a valid __file__ field") 159 resolved = jsonref.JsonRef.replace_refs( 160 raw_schema, loader=JsonFileLoader(base, "schemas/shared"), base_uri=base 161 ) 162 resolved = resolve_ref_links(resolved) 163 if isinstance(resolved, dict): 164 return resolved 165 else: 166 raise ValueError(f"Expected resolved to be a dict. Got {resolved}")
JSONSchema loader from package resources
122 def get_schema(self, name: str) -> dict[str, Any]: 123 """ 124 This method retrieves a JSON schema from the schemas/ folder. 125 126 127 The expected file structure is to have all top-level schemas (corresponding to streams) in the "schemas/" folder, with any shared $refs 128 living inside the "schemas/shared/" folder. For example: 129 130 schemas/shared/<shared_definition>.json 131 schemas/<name>.json # contains a $ref to shared_definition 132 schemas/<name2>.json # contains a $ref to shared_definition 133 """ 134 135 schema_filename = f"schemas/{name}.json" 136 raw_file = pkgutil.get_data(self.package_name, schema_filename) 137 if not raw_file: 138 raise IOError(f"Cannot find file {schema_filename}") 139 try: 140 raw_schema = json.loads(raw_file) 141 except ValueError as err: 142 raise RuntimeError(f"Invalid JSON file format for file {schema_filename}") from err 143 144 return self._resolve_schema_references(raw_schema)
This method retrieves a JSON schema from the schemas/ folder.
The expected file structure is to have all top-level schemas (corresponding to streams) in the "schemas/" folder, with any shared $refs living inside the "schemas/shared/" folder. For example:
schemas/shared/
169def check_config_against_spec_or_exit( 170 config: Mapping[str, Any], spec: ConnectorSpecification 171) -> None: 172 """ 173 Check config object against spec. In case of spec is invalid, throws 174 an exception with validation error description. 175 176 :param config - config loaded from file specified over command line 177 :param spec - spec object generated by connector 178 """ 179 spec_schema = spec.connectionSpecification 180 try: 181 validate(instance=config, schema=spec_schema) 182 except ValidationError as validation_error: 183 raise AirbyteTracedException( 184 message="Config validation error: " + validation_error.message, 185 internal_message=validation_error.message, 186 failure_type=FailureType.config_error, 187 ) from None # required to prevent logging config secrets from the ValidationError's stacktrace
Check config object against spec. In case of spec is invalid, throws an exception with validation error description.
:param config - config loaded from file specified over command line :param spec - spec object generated by connector
190class InternalConfig(BaseModel): 191 KEYWORDS: ClassVar[set[str]] = {"_limit", "_page_size"} 192 limit: int = Field(None, alias="_limit") 193 page_size: int = Field(None, alias="_page_size") 194 195 def dict(self, *args: Any, **kwargs: Any) -> dict[str, Any]: 196 kwargs["by_alias"] = True 197 kwargs["exclude_unset"] = True 198 return super().dict(*args, **kwargs) 199 200 def is_limit_reached(self, records_counter: int) -> bool: 201 """ 202 Check if record count reached limit set by internal config. 203 :param records_counter - number of records already red 204 :return True if limit reached, False otherwise 205 """ 206 if self.limit: 207 if records_counter >= self.limit: 208 return True 209 return False
195 def dict(self, *args: Any, **kwargs: Any) -> dict[str, Any]: 196 kwargs["by_alias"] = True 197 kwargs["exclude_unset"] = True 198 return super().dict(*args, **kwargs)
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
200 def is_limit_reached(self, records_counter: int) -> bool: 201 """ 202 Check if record count reached limit set by internal config. 203 :param records_counter - number of records already red 204 :return True if limit reached, False otherwise 205 """ 206 if self.limit: 207 if records_counter >= self.limit: 208 return True 209 return False
Check if record count reached limit set by internal config. :param records_counter - number of records already red :return True if limit reached, False otherwise
212def split_config(config: Mapping[str, Any]) -> Tuple[dict[str, Any], InternalConfig]: 213 """ 214 Break config map object into 2 instances: first is a dict with user defined 215 configuration and second is internal config that contains private keys for 216 acceptance test configuration. 217 218 :param 219 config - Dict object that has been loaded from config file. 220 221 :return tuple of user defined config dict with filtered out internal 222 parameters and connector acceptance test internal config object. 223 """ 224 main_config = {} 225 internal_config = {} 226 for k, v in config.items(): 227 if k in InternalConfig.KEYWORDS: 228 internal_config[k] = v 229 else: 230 main_config[k] = v 231 return main_config, InternalConfig.parse_obj(internal_config)
Break config map object into 2 instances: first is a dict with user defined configuration and second is internal config that contains private keys for acceptance test configuration.
:param config - Dict object that has been loaded from config file.
:return tuple of user defined config dict with filtered out internal parameters and connector acceptance test internal config object.