airbyte_cdk.sources.utils.schema_helpers

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5
  6import importlib
  7import json
  8import os
  9import pkgutil
 10from typing import Any, ClassVar, Dict, List, Mapping, MutableMapping, Optional, Tuple
 11
 12import jsonref
 13from jsonschema import RefResolver, validate
 14from jsonschema.exceptions import ValidationError
 15from pydantic.v1 import BaseModel, Field
 16
 17from airbyte_cdk.models import ConnectorSpecification, FailureType
 18from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 19
 20
 21class JsonFileLoader:
 22    """
 23    Custom json file loader to resolve references to resources located in "shared" directory.
 24    We need this for compatability with existing schemas cause all of them have references
 25    pointing to shared_schema.json file instead of shared/shared_schema.json
 26    """
 27
 28    def __init__(self, uri_base: str, shared: str):
 29        self.shared = shared
 30        self.uri_base = uri_base
 31
 32    def __call__(self, uri: str) -> Dict[str, Any]:
 33        uri = uri.replace(self.uri_base, f"{self.uri_base}/{self.shared}/")
 34        with open(uri) as f:
 35            data = json.load(f)
 36            if isinstance(data, dict):
 37                return data
 38            else:
 39                raise ValueError(f"Expected to read a dictionary from {uri}. Got: {data}")
 40
 41
 42def resolve_ref_links(obj: Any) -> Any:
 43    """
 44    Scan resolved schema and convert jsonref.JsonRef object to JSON serializable dict.
 45
 46    :param obj - jsonschema object with ref field resolved.
 47    :return JSON serializable object with references without external dependencies.
 48    """
 49    if isinstance(obj, jsonref.JsonRef):
 50        obj = resolve_ref_links(obj.__subject__)
 51        # Omit existing definitions for external resource since
 52        # we dont need it anymore.
 53        if isinstance(obj, dict):
 54            obj.pop("definitions", None)
 55            return obj
 56        else:
 57            raise ValueError(f"Expected obj to be a dict. Got {obj}")
 58    elif isinstance(obj, dict):
 59        return {k: resolve_ref_links(v) for k, v in obj.items()}
 60    elif isinstance(obj, list):
 61        return [resolve_ref_links(item) for item in obj]
 62    else:
 63        return obj
 64
 65
 66def _expand_refs(schema: Any, ref_resolver: Optional[RefResolver] = None) -> None:
 67    """Internal function to iterate over schema and replace all occurrences of $ref with their definitions. Recursive.
 68
 69    :param schema: schema that will be patched
 70    :param ref_resolver: resolver to get definition from $ref, if None pass it will be instantiated
 71    """
 72    ref_resolver = ref_resolver or RefResolver.from_schema(schema)
 73
 74    if isinstance(schema, MutableMapping):
 75        if "$ref" in schema:
 76            ref_url = schema.pop("$ref")
 77            _, definition = ref_resolver.resolve(ref_url)
 78            _expand_refs(
 79                definition, ref_resolver=ref_resolver
 80            )  # expand refs in definitions as well
 81            schema.update(definition)
 82        else:
 83            for key, value in schema.items():
 84                _expand_refs(value, ref_resolver=ref_resolver)
 85    elif isinstance(schema, List):
 86        for value in schema:
 87            _expand_refs(value, ref_resolver=ref_resolver)
 88
 89
 90def expand_refs(schema: Any) -> None:
 91    """Iterate over schema and replace all occurrences of $ref with their definitions.
 92
 93    :param schema: schema that will be patched
 94    """
 95    _expand_refs(schema)
 96    schema.pop("definitions", None)  # remove definitions created by $ref
 97
 98
 99def rename_key(schema: Any, old_key: str, new_key: str) -> None:
100    """Iterate over nested dictionary and replace one key with another. Used to replace anyOf with oneOf. Recursive."
101
102    :param schema: schema that will be patched
103    :param old_key: name of the key to replace
104    :param new_key: new name of the key
105    """
106    if not isinstance(schema, MutableMapping):
107        return
108
109    for key, value in schema.items():
110        rename_key(value, old_key, new_key)
111        if old_key in schema:
112            schema[new_key] = schema.pop(old_key)
113
114
115class ResourceSchemaLoader:
116    """JSONSchema loader from package resources"""
117
118    def __init__(self, package_name: str):
119        self.package_name = package_name
120
121    def get_schema(self, name: str) -> dict[str, Any]:
122        """
123        This method retrieves a JSON schema from the schemas/ folder.
124
125
126        The expected file structure is to have all top-level schemas (corresponding to streams) in the "schemas/" folder, with any shared $refs
127        living inside the "schemas/shared/" folder. For example:
128
129        schemas/shared/<shared_definition>.json
130        schemas/<name>.json # contains a $ref to shared_definition
131        schemas/<name2>.json # contains a $ref to shared_definition
132        """
133
134        schema_filename = f"schemas/{name}.json"
135        raw_file = pkgutil.get_data(self.package_name, schema_filename)
136        if not raw_file:
137            raise IOError(f"Cannot find file {schema_filename}")
138        try:
139            raw_schema = json.loads(raw_file)
140        except ValueError as err:
141            raise RuntimeError(f"Invalid JSON file format for file {schema_filename}") from err
142
143        return self._resolve_schema_references(raw_schema)
144
145    def _resolve_schema_references(self, raw_schema: dict[str, Any]) -> dict[str, Any]:
146        """
147        Resolve links to external references and move it to local "definitions" map.
148
149        :param raw_schema jsonschema to lookup for external links.
150        :return JSON serializable object with references without external dependencies.
151        """
152
153        package = importlib.import_module(self.package_name)
154        if package.__file__:
155            base = os.path.dirname(package.__file__) + "/"
156        else:
157            raise ValueError(f"Package {package} does not have a valid __file__ field")
158        resolved = jsonref.JsonRef.replace_refs(
159            raw_schema, loader=JsonFileLoader(base, "schemas/shared"), base_uri=base
160        )
161        resolved = resolve_ref_links(resolved)
162        if isinstance(resolved, dict):
163            return resolved
164        else:
165            raise ValueError(f"Expected resolved to be a dict. Got {resolved}")
166
167
168def check_config_against_spec_or_exit(
169    config: Mapping[str, Any], spec: ConnectorSpecification
170) -> None:
171    """
172    Check config object against spec. In case of spec is invalid, throws
173    an exception with validation error description.
174
175    :param config - config loaded from file specified over command line
176    :param spec - spec object generated by connector
177    """
178    spec_schema = spec.connectionSpecification
179    try:
180        validate(instance=config, schema=spec_schema)
181    except ValidationError as validation_error:
182        raise AirbyteTracedException(
183            message="Config validation error: " + validation_error.message,
184            internal_message=validation_error.message,
185            failure_type=FailureType.config_error,
186        ) from None  # required to prevent logging config secrets from the ValidationError's stacktrace
187
188
189class InternalConfig(BaseModel):
190    KEYWORDS: ClassVar[set[str]] = {"_limit", "_page_size"}
191    limit: int = Field(None, alias="_limit")
192    page_size: int = Field(None, alias="_page_size")
193
194    def dict(self, *args: Any, **kwargs: Any) -> dict[str, Any]:
195        kwargs["by_alias"] = True
196        kwargs["exclude_unset"] = True
197        return super().dict(*args, **kwargs)
198
199    def is_limit_reached(self, records_counter: int) -> bool:
200        """
201        Check if record count reached limit set by internal config.
202        :param records_counter - number of records already red
203        :return True if limit reached, False otherwise
204        """
205        if self.limit:
206            if records_counter >= self.limit:
207                return True
208        return False
209
210
211def split_config(config: Mapping[str, Any]) -> Tuple[dict[str, Any], InternalConfig]:
212    """
213    Break config map object into 2 instances: first is a dict with user defined
214    configuration and second is internal config that contains private keys for
215    acceptance test configuration.
216
217    :param
218     config - Dict object that has been loaded from config file.
219
220    :return tuple of user defined config dict with filtered out internal
221    parameters and connector acceptance test internal config object.
222    """
223    main_config = {}
224    internal_config = {}
225    for k, v in config.items():
226        if k in InternalConfig.KEYWORDS:
227            internal_config[k] = v
228        else:
229            main_config[k] = v
230    return main_config, InternalConfig.parse_obj(internal_config)
class JsonFileLoader:
22class JsonFileLoader:
23    """
24    Custom json file loader to resolve references to resources located in "shared" directory.
25    We need this for compatability with existing schemas cause all of them have references
26    pointing to shared_schema.json file instead of shared/shared_schema.json
27    """
28
29    def __init__(self, uri_base: str, shared: str):
30        self.shared = shared
31        self.uri_base = uri_base
32
33    def __call__(self, uri: str) -> Dict[str, Any]:
34        uri = uri.replace(self.uri_base, f"{self.uri_base}/{self.shared}/")
35        with open(uri) as f:
36            data = json.load(f)
37            if isinstance(data, dict):
38                return data
39            else:
40                raise ValueError(f"Expected to read a dictionary from {uri}. Got: {data}")

Custom json file loader to resolve references to resources located in "shared" directory. We need this for compatability with existing schemas cause all of them have references pointing to shared_schema.json file instead of shared/shared_schema.json

JsonFileLoader(uri_base: str, shared: str)
29    def __init__(self, uri_base: str, shared: str):
30        self.shared = shared
31        self.uri_base = uri_base
shared
uri_base
def expand_refs(schema: Any) -> None:
91def expand_refs(schema: Any) -> None:
92    """Iterate over schema and replace all occurrences of $ref with their definitions.
93
94    :param schema: schema that will be patched
95    """
96    _expand_refs(schema)
97    schema.pop("definitions", None)  # remove definitions created by $ref

Iterate over schema and replace all occurrences of $ref with their definitions.

Parameters
  • schema: schema that will be patched
def rename_key(schema: Any, old_key: str, new_key: str) -> None:
100def rename_key(schema: Any, old_key: str, new_key: str) -> None:
101    """Iterate over nested dictionary and replace one key with another. Used to replace anyOf with oneOf. Recursive."
102
103    :param schema: schema that will be patched
104    :param old_key: name of the key to replace
105    :param new_key: new name of the key
106    """
107    if not isinstance(schema, MutableMapping):
108        return
109
110    for key, value in schema.items():
111        rename_key(value, old_key, new_key)
112        if old_key in schema:
113            schema[new_key] = schema.pop(old_key)

Iterate over nested dictionary and replace one key with another. Used to replace anyOf with oneOf. Recursive."

Parameters
  • schema: schema that will be patched
  • old_key: name of the key to replace
  • new_key: new name of the key
class ResourceSchemaLoader:
116class ResourceSchemaLoader:
117    """JSONSchema loader from package resources"""
118
119    def __init__(self, package_name: str):
120        self.package_name = package_name
121
122    def get_schema(self, name: str) -> dict[str, Any]:
123        """
124        This method retrieves a JSON schema from the schemas/ folder.
125
126
127        The expected file structure is to have all top-level schemas (corresponding to streams) in the "schemas/" folder, with any shared $refs
128        living inside the "schemas/shared/" folder. For example:
129
130        schemas/shared/<shared_definition>.json
131        schemas/<name>.json # contains a $ref to shared_definition
132        schemas/<name2>.json # contains a $ref to shared_definition
133        """
134
135        schema_filename = f"schemas/{name}.json"
136        raw_file = pkgutil.get_data(self.package_name, schema_filename)
137        if not raw_file:
138            raise IOError(f"Cannot find file {schema_filename}")
139        try:
140            raw_schema = json.loads(raw_file)
141        except ValueError as err:
142            raise RuntimeError(f"Invalid JSON file format for file {schema_filename}") from err
143
144        return self._resolve_schema_references(raw_schema)
145
146    def _resolve_schema_references(self, raw_schema: dict[str, Any]) -> dict[str, Any]:
147        """
148        Resolve links to external references and move it to local "definitions" map.
149
150        :param raw_schema jsonschema to lookup for external links.
151        :return JSON serializable object with references without external dependencies.
152        """
153
154        package = importlib.import_module(self.package_name)
155        if package.__file__:
156            base = os.path.dirname(package.__file__) + "/"
157        else:
158            raise ValueError(f"Package {package} does not have a valid __file__ field")
159        resolved = jsonref.JsonRef.replace_refs(
160            raw_schema, loader=JsonFileLoader(base, "schemas/shared"), base_uri=base
161        )
162        resolved = resolve_ref_links(resolved)
163        if isinstance(resolved, dict):
164            return resolved
165        else:
166            raise ValueError(f"Expected resolved to be a dict. Got {resolved}")

JSONSchema loader from package resources

ResourceSchemaLoader(package_name: str)
119    def __init__(self, package_name: str):
120        self.package_name = package_name
package_name
def get_schema(self, name: str) -> dict[str, typing.Any]:
122    def get_schema(self, name: str) -> dict[str, Any]:
123        """
124        This method retrieves a JSON schema from the schemas/ folder.
125
126
127        The expected file structure is to have all top-level schemas (corresponding to streams) in the "schemas/" folder, with any shared $refs
128        living inside the "schemas/shared/" folder. For example:
129
130        schemas/shared/<shared_definition>.json
131        schemas/<name>.json # contains a $ref to shared_definition
132        schemas/<name2>.json # contains a $ref to shared_definition
133        """
134
135        schema_filename = f"schemas/{name}.json"
136        raw_file = pkgutil.get_data(self.package_name, schema_filename)
137        if not raw_file:
138            raise IOError(f"Cannot find file {schema_filename}")
139        try:
140            raw_schema = json.loads(raw_file)
141        except ValueError as err:
142            raise RuntimeError(f"Invalid JSON file format for file {schema_filename}") from err
143
144        return self._resolve_schema_references(raw_schema)

This method retrieves a JSON schema from the schemas/ folder.

The expected file structure is to have all top-level schemas (corresponding to streams) in the "schemas/" folder, with any shared $refs living inside the "schemas/shared/" folder. For example:

schemas/shared/.json schemas/.json # contains a $ref to shared_definition schemas/.json # contains a $ref to shared_definition

def check_config_against_spec_or_exit( config: Mapping[str, Any], spec: airbyte_protocol_dataclasses.models.airbyte_protocol.ConnectorSpecification) -> None:
169def check_config_against_spec_or_exit(
170    config: Mapping[str, Any], spec: ConnectorSpecification
171) -> None:
172    """
173    Check config object against spec. In case of spec is invalid, throws
174    an exception with validation error description.
175
176    :param config - config loaded from file specified over command line
177    :param spec - spec object generated by connector
178    """
179    spec_schema = spec.connectionSpecification
180    try:
181        validate(instance=config, schema=spec_schema)
182    except ValidationError as validation_error:
183        raise AirbyteTracedException(
184            message="Config validation error: " + validation_error.message,
185            internal_message=validation_error.message,
186            failure_type=FailureType.config_error,
187        ) from None  # required to prevent logging config secrets from the ValidationError's stacktrace

Check config object against spec. In case of spec is invalid, throws an exception with validation error description.

:param config - config loaded from file specified over command line :param spec - spec object generated by connector

class InternalConfig(pydantic.v1.main.BaseModel):
190class InternalConfig(BaseModel):
191    KEYWORDS: ClassVar[set[str]] = {"_limit", "_page_size"}
192    limit: int = Field(None, alias="_limit")
193    page_size: int = Field(None, alias="_page_size")
194
195    def dict(self, *args: Any, **kwargs: Any) -> dict[str, Any]:
196        kwargs["by_alias"] = True
197        kwargs["exclude_unset"] = True
198        return super().dict(*args, **kwargs)
199
200    def is_limit_reached(self, records_counter: int) -> bool:
201        """
202        Check if record count reached limit set by internal config.
203        :param records_counter - number of records already red
204        :return True if limit reached, False otherwise
205        """
206        if self.limit:
207            if records_counter >= self.limit:
208                return True
209        return False
KEYWORDS: ClassVar[set[str]] = {'_limit', '_page_size'}
limit: int
page_size: int
def dict(self, *args: Any, **kwargs: Any) -> dict[str, typing.Any]:
195    def dict(self, *args: Any, **kwargs: Any) -> dict[str, Any]:
196        kwargs["by_alias"] = True
197        kwargs["exclude_unset"] = True
198        return super().dict(*args, **kwargs)

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

def is_limit_reached(self, records_counter: int) -> bool:
200    def is_limit_reached(self, records_counter: int) -> bool:
201        """
202        Check if record count reached limit set by internal config.
203        :param records_counter - number of records already red
204        :return True if limit reached, False otherwise
205        """
206        if self.limit:
207            if records_counter >= self.limit:
208                return True
209        return False

Check if record count reached limit set by internal config. :param records_counter - number of records already red :return True if limit reached, False otherwise

def split_config( config: Mapping[str, Any]) -> Tuple[dict[str, Any], InternalConfig]:
212def split_config(config: Mapping[str, Any]) -> Tuple[dict[str, Any], InternalConfig]:
213    """
214    Break config map object into 2 instances: first is a dict with user defined
215    configuration and second is internal config that contains private keys for
216    acceptance test configuration.
217
218    :param
219     config - Dict object that has been loaded from config file.
220
221    :return tuple of user defined config dict with filtered out internal
222    parameters and connector acceptance test internal config object.
223    """
224    main_config = {}
225    internal_config = {}
226    for k, v in config.items():
227        if k in InternalConfig.KEYWORDS:
228            internal_config[k] = v
229        else:
230            main_config[k] = v
231    return main_config, InternalConfig.parse_obj(internal_config)

Break config map object into 2 instances: first is a dict with user defined configuration and second is internal config that contains private keys for acceptance test configuration.

:param config - Dict object that has been loaded from config file.

:return tuple of user defined config dict with filtered out internal parameters and connector acceptance test internal config object.