airbyte_cdk.sources.utils.schema_helpers

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5
  6import importlib
  7import json
  8import os
  9import pkgutil
 10from copy import deepcopy
 11from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Mapping, MutableMapping, Tuple, cast
 12
 13import jsonref
 14from jsonschema import validate
 15from jsonschema.exceptions import ValidationError
 16from pydantic.v1 import BaseModel, Field
 17from referencing import Registry, Resource
 18from referencing._core import Resolver  # used for type hints
 19from referencing.jsonschema import DRAFT7
 20
 21from airbyte_cdk.models import ConnectorSpecification, FailureType
 22from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 23
 24
 25class JsonFileLoader:
 26    """
 27    Custom json file loader to resolve references to resources located in "shared" directory.
 28    We need this for compatability with existing schemas cause all of them have references
 29    pointing to shared_schema.json file instead of shared/shared_schema.json
 30    """
 31
 32    def __init__(self, uri_base: str, shared: str):
 33        self.shared = shared
 34        self.uri_base = uri_base
 35
 36    def __call__(self, uri: str) -> Dict[str, Any]:
 37        uri = uri.replace(self.uri_base, f"{self.uri_base}/{self.shared}/")
 38        with open(uri) as f:
 39            data = json.load(f)
 40            if isinstance(data, dict):
 41                return data
 42            else:
 43                raise ValueError(f"Expected to read a dictionary from {uri}. Got: {data}")
 44
 45
 46def resolve_ref_links(obj: Any) -> Any:
 47    """
 48    Scan resolved schema and convert jsonref.JsonRef object to JSON serializable dict.
 49
 50    :param obj - jsonschema object with ref field resolved.
 51    :return JSON serializable object with references without external dependencies.
 52    """
 53    if isinstance(obj, jsonref.JsonRef):
 54        obj = resolve_ref_links(obj.__subject__)
 55        # Omit existing definitions for external resource since
 56        # we dont need it anymore.
 57        if isinstance(obj, dict):
 58            obj.pop("definitions", None)
 59            return obj
 60        else:
 61            raise ValueError(f"Expected obj to be a dict. Got {obj}")
 62    elif isinstance(obj, dict):
 63        return {k: resolve_ref_links(v) for k, v in obj.items()}
 64    elif isinstance(obj, list):
 65        return [resolve_ref_links(item) for item in obj]
 66    else:
 67        return obj
 68
 69
 70def get_ref_resolver_registry(schema: dict[str, Any]) -> Registry:
 71    """Get a reference resolver registry for the given schema."""
 72    resource: Resource = Resource.from_contents(
 73        contents=schema,
 74        default_specification=DRAFT7,
 75    )
 76    return cast(  # Mypy has a hard time detecting this return type.
 77        "Registry",
 78        Registry().with_resource(
 79            uri="",
 80            resource=resource,
 81        ),
 82    )
 83
 84
 85def _expand_refs(schema: Any, ref_resolver: Resolver) -> None:
 86    """Internal function to iterate over schema and replace all occurrences of $ref with their definitions. Recursive.
 87
 88    :param schema: schema that will be patched
 89    """
 90    if isinstance(schema, MutableMapping):
 91        if "$ref" in schema:
 92            ref_url = schema.pop("$ref")
 93            definition = ref_resolver.lookup(ref_url).contents
 94            _expand_refs(
 95                definition, ref_resolver=ref_resolver
 96            )  # expand refs in definitions as well
 97            schema.update(definition)
 98        else:
 99            for key, value in schema.items():
100                _expand_refs(value, ref_resolver=ref_resolver)
101    elif isinstance(schema, List):
102        for value in schema:
103            _expand_refs(value, ref_resolver=ref_resolver)
104
105
106def expand_refs(schema: Any) -> None:
107    """Iterate over schema and replace all occurrences of $ref with their definitions.
108
109    If a "definitions" section is present at the root of the schema, it will be removed
110    after $ref resolution is complete.
111
112    :param schema: schema that will be patched
113    """
114    ref_resolver = get_ref_resolver_registry(schema).resolver()
115    _expand_refs(schema, ref_resolver)
116    schema.pop("definitions", None)
117
118
119def rename_key(schema: Any, old_key: str, new_key: str) -> None:
120    """Iterate over nested dictionary and replace one key with another. Used to replace anyOf with oneOf. Recursive."
121
122    :param schema: schema that will be patched
123    :param old_key: name of the key to replace
124    :param new_key: new name of the key
125    """
126    if not isinstance(schema, MutableMapping):
127        return
128
129    for key, value in schema.items():
130        rename_key(value, old_key, new_key)
131        if old_key in schema:
132            schema[new_key] = schema.pop(old_key)
133
134
135class ResourceSchemaLoader:
136    """JSONSchema loader from package resources"""
137
138    def __init__(self, package_name: str):
139        self.package_name = package_name
140
141    def get_schema(self, name: str) -> dict[str, Any]:
142        """
143        This method retrieves a JSON schema from the schemas/ folder.
144
145
146        The expected file structure is to have all top-level schemas (corresponding to streams) in the "schemas/" folder, with any shared $refs
147        living inside the "schemas/shared/" folder. For example:
148
149        schemas/shared/<shared_definition>.json
150        schemas/<name>.json # contains a $ref to shared_definition
151        schemas/<name2>.json # contains a $ref to shared_definition
152        """
153
154        schema_filename = f"schemas/{name}.json"
155        raw_file = pkgutil.get_data(self.package_name, schema_filename)
156        if not raw_file:
157            raise IOError(f"Cannot find file {schema_filename}")
158        try:
159            raw_schema = json.loads(raw_file)
160        except ValueError as err:
161            raise RuntimeError(f"Invalid JSON file format for file {schema_filename}") from err
162
163        return self._resolve_schema_references(raw_schema)
164
165    def _resolve_schema_references(self, raw_schema: dict[str, Any]) -> dict[str, Any]:
166        """
167        Resolve links to external references and move it to local "definitions" map.
168
169        :param raw_schema jsonschema to lookup for external links.
170        :return JSON serializable object with references without external dependencies.
171        """
172
173        package = importlib.import_module(self.package_name)
174        if package.__file__:
175            base = os.path.dirname(package.__file__) + "/"
176        else:
177            raise ValueError(f"Package {package} does not have a valid __file__ field")
178        resolved = jsonref.JsonRef.replace_refs(
179            raw_schema, loader=JsonFileLoader(base, "schemas/shared"), base_uri=base
180        )
181        resolved = resolve_ref_links(resolved)
182        if isinstance(resolved, dict):
183            return resolved
184        else:
185            raise ValueError(f"Expected resolved to be a dict. Got {resolved}")
186
187
188def check_config_against_spec_or_exit(
189    config: Mapping[str, Any], spec: ConnectorSpecification
190) -> None:
191    """
192    Check config object against spec. In case of spec is invalid, throws
193    an exception with validation error description.
194
195    :param config - config loaded from file specified over command line
196    :param spec - spec object generated by connector
197    """
198    spec_schema = spec.connectionSpecification
199    try:
200        validate(instance=config, schema=spec_schema)
201    except ValidationError as validation_error:
202        raise AirbyteTracedException(
203            message="Config validation error: " + validation_error.message,
204            internal_message=validation_error.message,
205            failure_type=FailureType.config_error,
206        ) from None  # required to prevent logging config secrets from the ValidationError's stacktrace
207
208
209class InternalConfig(BaseModel):
210    KEYWORDS: ClassVar[set[str]] = {"_limit", "_page_size"}
211    limit: int = Field(None, alias="_limit")
212    page_size: int = Field(None, alias="_page_size")
213
214    def dict(self, *args: Any, **kwargs: Any) -> dict[str, Any]:
215        kwargs["by_alias"] = True
216        kwargs["exclude_unset"] = True
217        return super().dict(*args, **kwargs)
218
219    def is_limit_reached(self, records_counter: int) -> bool:
220        """
221        Check if record count reached limit set by internal config.
222        :param records_counter - number of records already red
223        :return True if limit reached, False otherwise
224        """
225        if self.limit:
226            if records_counter >= self.limit:
227                return True
228        return False
229
230
231def split_config(config: Mapping[str, Any]) -> Tuple[dict[str, Any], InternalConfig]:
232    """
233    Break config map object into 2 instances: first is a dict with user defined
234    configuration and second is internal config that contains private keys for
235    acceptance test configuration.
236
237    :param
238     config - Dict object that has been loaded from config file.
239
240    :return tuple of user defined config dict with filtered out internal
241    parameters and connector acceptance test internal config object.
242    """
243    main_config = {}
244    internal_config = {}
245    for k, v in config.items():
246        if k in InternalConfig.KEYWORDS:
247            internal_config[k] = v
248        else:
249            main_config[k] = v
250    return main_config, InternalConfig.parse_obj(internal_config)
class JsonFileLoader:
26class JsonFileLoader:
27    """
28    Custom json file loader to resolve references to resources located in "shared" directory.
29    We need this for compatability with existing schemas cause all of them have references
30    pointing to shared_schema.json file instead of shared/shared_schema.json
31    """
32
33    def __init__(self, uri_base: str, shared: str):
34        self.shared = shared
35        self.uri_base = uri_base
36
37    def __call__(self, uri: str) -> Dict[str, Any]:
38        uri = uri.replace(self.uri_base, f"{self.uri_base}/{self.shared}/")
39        with open(uri) as f:
40            data = json.load(f)
41            if isinstance(data, dict):
42                return data
43            else:
44                raise ValueError(f"Expected to read a dictionary from {uri}. Got: {data}")

Custom json file loader to resolve references to resources located in "shared" directory. We need this for compatability with existing schemas cause all of them have references pointing to shared_schema.json file instead of shared/shared_schema.json

JsonFileLoader(uri_base: str, shared: str)
33    def __init__(self, uri_base: str, shared: str):
34        self.shared = shared
35        self.uri_base = uri_base
shared
uri_base
def get_ref_resolver_registry(schema: dict[str, typing.Any]) -> referencing._core.Registry:
71def get_ref_resolver_registry(schema: dict[str, Any]) -> Registry:
72    """Get a reference resolver registry for the given schema."""
73    resource: Resource = Resource.from_contents(
74        contents=schema,
75        default_specification=DRAFT7,
76    )
77    return cast(  # Mypy has a hard time detecting this return type.
78        "Registry",
79        Registry().with_resource(
80            uri="",
81            resource=resource,
82        ),
83    )

Get a reference resolver registry for the given schema.

def expand_refs(schema: Any) -> None:
107def expand_refs(schema: Any) -> None:
108    """Iterate over schema and replace all occurrences of $ref with their definitions.
109
110    If a "definitions" section is present at the root of the schema, it will be removed
111    after $ref resolution is complete.
112
113    :param schema: schema that will be patched
114    """
115    ref_resolver = get_ref_resolver_registry(schema).resolver()
116    _expand_refs(schema, ref_resolver)
117    schema.pop("definitions", None)

Iterate over schema and replace all occurrences of $ref with their definitions.

If a "definitions" section is present at the root of the schema, it will be removed after $ref resolution is complete.

Parameters
  • schema: schema that will be patched
def rename_key(schema: Any, old_key: str, new_key: str) -> None:
120def rename_key(schema: Any, old_key: str, new_key: str) -> None:
121    """Iterate over nested dictionary and replace one key with another. Used to replace anyOf with oneOf. Recursive."
122
123    :param schema: schema that will be patched
124    :param old_key: name of the key to replace
125    :param new_key: new name of the key
126    """
127    if not isinstance(schema, MutableMapping):
128        return
129
130    for key, value in schema.items():
131        rename_key(value, old_key, new_key)
132        if old_key in schema:
133            schema[new_key] = schema.pop(old_key)

Iterate over nested dictionary and replace one key with another. Used to replace anyOf with oneOf. Recursive."

Parameters
  • schema: schema that will be patched
  • old_key: name of the key to replace
  • new_key: new name of the key
class ResourceSchemaLoader:
136class ResourceSchemaLoader:
137    """JSONSchema loader from package resources"""
138
139    def __init__(self, package_name: str):
140        self.package_name = package_name
141
142    def get_schema(self, name: str) -> dict[str, Any]:
143        """
144        This method retrieves a JSON schema from the schemas/ folder.
145
146
147        The expected file structure is to have all top-level schemas (corresponding to streams) in the "schemas/" folder, with any shared $refs
148        living inside the "schemas/shared/" folder. For example:
149
150        schemas/shared/<shared_definition>.json
151        schemas/<name>.json # contains a $ref to shared_definition
152        schemas/<name2>.json # contains a $ref to shared_definition
153        """
154
155        schema_filename = f"schemas/{name}.json"
156        raw_file = pkgutil.get_data(self.package_name, schema_filename)
157        if not raw_file:
158            raise IOError(f"Cannot find file {schema_filename}")
159        try:
160            raw_schema = json.loads(raw_file)
161        except ValueError as err:
162            raise RuntimeError(f"Invalid JSON file format for file {schema_filename}") from err
163
164        return self._resolve_schema_references(raw_schema)
165
166    def _resolve_schema_references(self, raw_schema: dict[str, Any]) -> dict[str, Any]:
167        """
168        Resolve links to external references and move it to local "definitions" map.
169
170        :param raw_schema jsonschema to lookup for external links.
171        :return JSON serializable object with references without external dependencies.
172        """
173
174        package = importlib.import_module(self.package_name)
175        if package.__file__:
176            base = os.path.dirname(package.__file__) + "/"
177        else:
178            raise ValueError(f"Package {package} does not have a valid __file__ field")
179        resolved = jsonref.JsonRef.replace_refs(
180            raw_schema, loader=JsonFileLoader(base, "schemas/shared"), base_uri=base
181        )
182        resolved = resolve_ref_links(resolved)
183        if isinstance(resolved, dict):
184            return resolved
185        else:
186            raise ValueError(f"Expected resolved to be a dict. Got {resolved}")

JSONSchema loader from package resources

ResourceSchemaLoader(package_name: str)
139    def __init__(self, package_name: str):
140        self.package_name = package_name
package_name
def get_schema(self, name: str) -> dict[str, typing.Any]:
142    def get_schema(self, name: str) -> dict[str, Any]:
143        """
144        This method retrieves a JSON schema from the schemas/ folder.
145
146
147        The expected file structure is to have all top-level schemas (corresponding to streams) in the "schemas/" folder, with any shared $refs
148        living inside the "schemas/shared/" folder. For example:
149
150        schemas/shared/<shared_definition>.json
151        schemas/<name>.json # contains a $ref to shared_definition
152        schemas/<name2>.json # contains a $ref to shared_definition
153        """
154
155        schema_filename = f"schemas/{name}.json"
156        raw_file = pkgutil.get_data(self.package_name, schema_filename)
157        if not raw_file:
158            raise IOError(f"Cannot find file {schema_filename}")
159        try:
160            raw_schema = json.loads(raw_file)
161        except ValueError as err:
162            raise RuntimeError(f"Invalid JSON file format for file {schema_filename}") from err
163
164        return self._resolve_schema_references(raw_schema)

This method retrieves a JSON schema from the schemas/ folder.

The expected file structure is to have all top-level schemas (corresponding to streams) in the "schemas/" folder, with any shared $refs living inside the "schemas/shared/" folder. For example:

schemas/shared/.json schemas/.json # contains a $ref to shared_definition schemas/.json # contains a $ref to shared_definition

def check_config_against_spec_or_exit( config: Mapping[str, Any], spec: airbyte_protocol_dataclasses.models.airbyte_protocol.ConnectorSpecification) -> None:
189def check_config_against_spec_or_exit(
190    config: Mapping[str, Any], spec: ConnectorSpecification
191) -> None:
192    """
193    Check config object against spec. In case of spec is invalid, throws
194    an exception with validation error description.
195
196    :param config - config loaded from file specified over command line
197    :param spec - spec object generated by connector
198    """
199    spec_schema = spec.connectionSpecification
200    try:
201        validate(instance=config, schema=spec_schema)
202    except ValidationError as validation_error:
203        raise AirbyteTracedException(
204            message="Config validation error: " + validation_error.message,
205            internal_message=validation_error.message,
206            failure_type=FailureType.config_error,
207        ) from None  # required to prevent logging config secrets from the ValidationError's stacktrace

Check config object against spec. In case of spec is invalid, throws an exception with validation error description.

:param config - config loaded from file specified over command line :param spec - spec object generated by connector

class InternalConfig(pydantic.v1.main.BaseModel):
210class InternalConfig(BaseModel):
211    KEYWORDS: ClassVar[set[str]] = {"_limit", "_page_size"}
212    limit: int = Field(None, alias="_limit")
213    page_size: int = Field(None, alias="_page_size")
214
215    def dict(self, *args: Any, **kwargs: Any) -> dict[str, Any]:
216        kwargs["by_alias"] = True
217        kwargs["exclude_unset"] = True
218        return super().dict(*args, **kwargs)
219
220    def is_limit_reached(self, records_counter: int) -> bool:
221        """
222        Check if record count reached limit set by internal config.
223        :param records_counter - number of records already red
224        :return True if limit reached, False otherwise
225        """
226        if self.limit:
227            if records_counter >= self.limit:
228                return True
229        return False
KEYWORDS: ClassVar[set[str]] = {'_limit', '_page_size'}
limit: int
page_size: int
def dict(self, *args: Any, **kwargs: Any) -> dict[str, typing.Any]:
215    def dict(self, *args: Any, **kwargs: Any) -> dict[str, Any]:
216        kwargs["by_alias"] = True
217        kwargs["exclude_unset"] = True
218        return super().dict(*args, **kwargs)

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

def is_limit_reached(self, records_counter: int) -> bool:
220    def is_limit_reached(self, records_counter: int) -> bool:
221        """
222        Check if record count reached limit set by internal config.
223        :param records_counter - number of records already red
224        :return True if limit reached, False otherwise
225        """
226        if self.limit:
227            if records_counter >= self.limit:
228                return True
229        return False

Check if record count reached limit set by internal config. :param records_counter - number of records already red :return True if limit reached, False otherwise

def split_config( config: Mapping[str, Any]) -> Tuple[dict[str, Any], InternalConfig]:
232def split_config(config: Mapping[str, Any]) -> Tuple[dict[str, Any], InternalConfig]:
233    """
234    Break config map object into 2 instances: first is a dict with user defined
235    configuration and second is internal config that contains private keys for
236    acceptance test configuration.
237
238    :param
239     config - Dict object that has been loaded from config file.
240
241    :return tuple of user defined config dict with filtered out internal
242    parameters and connector acceptance test internal config object.
243    """
244    main_config = {}
245    internal_config = {}
246    for k, v in config.items():
247        if k in InternalConfig.KEYWORDS:
248            internal_config[k] = v
249        else:
250            main_config[k] = v
251    return main_config, InternalConfig.parse_obj(internal_config)

Break config map object into 2 instances: first is a dict with user defined configuration and second is internal config that contains private keys for acceptance test configuration.

:param config - Dict object that has been loaded from config file.

:return tuple of user defined config dict with filtered out internal parameters and connector acceptance test internal config object.