airbyte_cdk.sources.declarative.schema

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.schema.default_schema_loader import DefaultSchemaLoader
 6from airbyte_cdk.sources.declarative.schema.dynamic_schema_loader import (
 7    ComplexFieldType,
 8    DynamicSchemaLoader,
 9    SchemaTypeIdentifier,
10    TypesMap,
11)
12from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader
13from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
14from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
15
16__all__ = [
17    "JsonFileSchemaLoader",
18    "DefaultSchemaLoader",
19    "SchemaLoader",
20    "InlineSchemaLoader",
21    "DynamicSchemaLoader",
22    "ComplexFieldType",
23    "TypesMap",
24    "SchemaTypeIdentifier",
25]
33@dataclass
34class JsonFileSchemaLoader(ResourceSchemaLoader, SchemaLoader):
35    """
36    Loads the schema from a json file
37
38    Attributes:
39        file_path (Union[InterpolatedString, str]): The path to the json file describing the schema
40        name (str): The stream's name
41        config (Config): The user-provided configuration as specified by the source's spec
42        parameters (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed
43    """
44
45    config: Config
46    parameters: InitVar[Mapping[str, Any]]
47    file_path: Union[InterpolatedString, str] = field(default="")
48
49    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
50        if not self.file_path:
51            self.file_path = _default_file_path()
52        self.file_path = InterpolatedString.create(self.file_path, parameters=parameters)
53
54    def get_json_schema(self) -> Mapping[str, Any]:
55        # todo: It is worth revisiting if we can replace file_path with just file_name if every schema is in the /schemas directory
56        # this would require that we find a creative solution to store or retrieve source_name in here since the files are mounted there
57        json_schema_path = self._get_json_filepath()
58        resource, schema_path = self.extract_resource_and_schema_path(json_schema_path)
59        raw_json_file = pkgutil.get_data(resource, schema_path)
60
61        if not raw_json_file:
62            raise IOError(f"Cannot find file {json_schema_path}")
63        try:
64            raw_schema = json.loads(raw_json_file)
65        except ValueError as err:
66            raise RuntimeError(f"Invalid JSON file format for file {json_schema_path}") from err
67        self.package_name = resource
68        return self._resolve_schema_references(raw_schema)
69
70    def _get_json_filepath(self) -> Any:
71        return self.file_path.eval(self.config)  # type: ignore # file_path is always cast to an interpolated string
72
73    @staticmethod
74    def extract_resource_and_schema_path(json_schema_path: str) -> Tuple[str, str]:
75        """
76        When the connector is running on a docker container, package_data is accessible from the resource (source_<name>), so we extract
77        the resource from the first part of the schema path and the remaining path is used to find the schema file. This is a slight
78        hack to identify the source name while we are in the airbyte_cdk module.
79        :param json_schema_path: The path to the schema JSON file
80        :return: Tuple of the resource name and the path to the schema file
81        """
82        split_path = json_schema_path.split("/")
83
84        if split_path[0] == "" or split_path[0] == ".":
85            split_path = split_path[1:]
86
87        if len(split_path) == 0:
88            return "", ""
89
90        if len(split_path) == 1:
91            return "", split_path[0]
92
93        return split_path[0], "/".join(split_path[1:])

Loads the schema from a json file

Attributes:
  • file_path (Union[InterpolatedString, str]): The path to the json file describing the schema
  • name (str): The stream's name
  • config (Config): The user-provided configuration as specified by the source's spec
  • parameters (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed
JsonFileSchemaLoader( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], file_path: Union[airbyte_cdk.InterpolatedString, str] = '')
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
file_path: Union[airbyte_cdk.InterpolatedString, str] = ''
def get_json_schema(self) -> Mapping[str, Any]:
54    def get_json_schema(self) -> Mapping[str, Any]:
55        # todo: It is worth revisiting if we can replace file_path with just file_name if every schema is in the /schemas directory
56        # this would require that we find a creative solution to store or retrieve source_name in here since the files are mounted there
57        json_schema_path = self._get_json_filepath()
58        resource, schema_path = self.extract_resource_and_schema_path(json_schema_path)
59        raw_json_file = pkgutil.get_data(resource, schema_path)
60
61        if not raw_json_file:
62            raise IOError(f"Cannot find file {json_schema_path}")
63        try:
64            raw_schema = json.loads(raw_json_file)
65        except ValueError as err:
66            raise RuntimeError(f"Invalid JSON file format for file {json_schema_path}") from err
67        self.package_name = resource
68        return self._resolve_schema_references(raw_schema)

Returns a mapping describing the stream's schema

@staticmethod
def extract_resource_and_schema_path(json_schema_path: str) -> Tuple[str, str]:
73    @staticmethod
74    def extract_resource_and_schema_path(json_schema_path: str) -> Tuple[str, str]:
75        """
76        When the connector is running on a docker container, package_data is accessible from the resource (source_<name>), so we extract
77        the resource from the first part of the schema path and the remaining path is used to find the schema file. This is a slight
78        hack to identify the source name while we are in the airbyte_cdk module.
79        :param json_schema_path: The path to the schema JSON file
80        :return: Tuple of the resource name and the path to the schema file
81        """
82        split_path = json_schema_path.split("/")
83
84        if split_path[0] == "" or split_path[0] == ".":
85            split_path = split_path[1:]
86
87        if len(split_path) == 0:
88            return "", ""
89
90        if len(split_path) == 1:
91            return "", split_path[0]
92
93        return split_path[0], "/".join(split_path[1:])

When the connector is running on a docker container, package_data is accessible from the resource (source_), so we extract the resource from the first part of the schema path and the remaining path is used to find the schema file. This is a slight hack to identify the source name while we are in the airbyte_cdk module.

Parameters
  • json_schema_path: The path to the schema JSON file
Returns

Tuple of the resource name and the path to the schema file

@dataclass
class DefaultSchemaLoader(airbyte_cdk.sources.declarative.schema.SchemaLoader):
15@dataclass
16class DefaultSchemaLoader(SchemaLoader):
17    """
18    Loads a schema from the default location or returns an empty schema for streams that have not defined their schema file yet.
19
20    Attributes:
21        config (Config): The user-provided configuration as specified by the source's spec
22        parameters (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed
23    """
24
25    config: Config
26    parameters: InitVar[Mapping[str, Any]]
27
28    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
29        self._parameters = parameters
30        self.default_loader = JsonFileSchemaLoader(parameters=parameters, config=self.config)
31
32    def get_json_schema(self) -> Mapping[str, Any]:
33        """
34        Attempts to retrieve a schema from the default filepath location or returns the empty schema if a schema cannot be found.
35
36        :return: The empty schema
37        """
38
39        try:
40            return self.default_loader.get_json_schema()
41        except OSError:
42            # A slight hack since we don't directly have the stream name. However, when building the default filepath we assume the
43            # runtime options stores stream name 'name' so we'll do the same here
44            stream_name = self._parameters.get("name", "")
45            logging.info(
46                f"Could not find schema for stream {stream_name}, defaulting to the empty schema"
47            )
48            return {}

Loads a schema from the default location or returns an empty schema for streams that have not defined their schema file yet.

Attributes:
  • config (Config): The user-provided configuration as specified by the source's spec
  • parameters (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed
DefaultSchemaLoader( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def get_json_schema(self) -> Mapping[str, Any]:
32    def get_json_schema(self) -> Mapping[str, Any]:
33        """
34        Attempts to retrieve a schema from the default filepath location or returns the empty schema if a schema cannot be found.
35
36        :return: The empty schema
37        """
38
39        try:
40            return self.default_loader.get_json_schema()
41        except OSError:
42            # A slight hack since we don't directly have the stream name. However, when building the default filepath we assume the
43            # runtime options stores stream name 'name' so we'll do the same here
44            stream_name = self._parameters.get("name", "")
45            logging.info(
46                f"Could not find schema for stream {stream_name}, defaulting to the empty schema"
47            )
48            return {}

Attempts to retrieve a schema from the default filepath location or returns the empty schema if a schema cannot be found.

Returns

The empty schema

@dataclass
class SchemaLoader:
11@dataclass
12class SchemaLoader:
13    """Describes a stream's schema"""
14
15    @abstractmethod
16    def get_json_schema(self) -> Mapping[str, Any]:
17        """Returns a mapping describing the stream's schema"""
18        pass

Describes a stream's schema

@abstractmethod
def get_json_schema(self) -> Mapping[str, Any]:
15    @abstractmethod
16    def get_json_schema(self) -> Mapping[str, Any]:
17        """Returns a mapping describing the stream's schema"""
18        pass

Returns a mapping describing the stream's schema

@dataclass
class InlineSchemaLoader(airbyte_cdk.sources.declarative.schema.SchemaLoader):
12@dataclass
13class InlineSchemaLoader(SchemaLoader):
14    """Describes a stream's schema"""
15
16    schema: Dict[str, Any]
17    parameters: InitVar[Mapping[str, Any]]
18
19    def get_json_schema(self) -> Mapping[str, Any]:
20        return self.schema

Describes a stream's schema

InlineSchemaLoader( schema: Dict[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
schema: Dict[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def get_json_schema(self) -> Mapping[str, Any]:
19    def get_json_schema(self) -> Mapping[str, Any]:
20        return self.schema

Returns a mapping describing the stream's schema

@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class DynamicSchemaLoader(airbyte_cdk.sources.declarative.schema.SchemaLoader):
118@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
119@dataclass
120class DynamicSchemaLoader(SchemaLoader):
121    """
122    Dynamically loads a JSON Schema by extracting data from retrieved records.
123    """
124
125    retriever: Retriever
126    config: Config
127    parameters: InitVar[Mapping[str, Any]]
128    schema_type_identifier: SchemaTypeIdentifier
129    schema_transformations: List[RecordTransformation] = field(default_factory=lambda: [])
130
131    def get_json_schema(self) -> Mapping[str, Any]:
132        """
133        Constructs a JSON Schema based on retrieved data.
134        """
135        properties = {}
136        retrieved_record = next(self.retriever.read_records({}), None)  # type: ignore[call-overload] # read_records return Iterable data type
137
138        raw_schema = (
139            self._extract_data(
140                retrieved_record,  # type: ignore[arg-type] # Expected that retrieved_record will be only Mapping[str, Any]
141                self.schema_type_identifier.schema_pointer,
142            )
143            if retrieved_record
144            else []
145        )
146
147        for property_definition in raw_schema:
148            key = self._get_key(property_definition, self.schema_type_identifier.key_pointer)
149            value = self._get_type(
150                property_definition,
151                self.schema_type_identifier.type_pointer,
152            )
153            properties[key] = value
154
155        transformed_properties = self._transform(properties, {})
156
157        return {
158            "$schema": "https://json-schema.org/draft-07/schema#",
159            "type": "object",
160            "additionalProperties": True,
161            "properties": transformed_properties,
162        }
163
164    def _transform(
165        self,
166        properties: Mapping[str, Any],
167        stream_state: StreamState,
168        stream_slice: Optional[StreamSlice] = None,
169    ) -> Mapping[str, Any]:
170        for transformation in self.schema_transformations:
171            transformation.transform(
172                properties,  # type: ignore  # properties has type Mapping[str, Any], but Dict[str, Any] expected
173                config=self.config,
174            )
175        return properties
176
177    def _get_key(
178        self,
179        raw_schema: MutableMapping[str, Any],
180        field_key_path: List[Union[InterpolatedString, str]],
181    ) -> str:
182        """
183        Extracts the key field from the schema using the specified path.
184        """
185        field_key = self._extract_data(raw_schema, field_key_path)
186        if not isinstance(field_key, str):
187            raise ValueError(f"Expected key to be a string. Got {field_key}")
188        return field_key
189
190    def _get_type(
191        self,
192        raw_schema: MutableMapping[str, Any],
193        field_type_path: Optional[List[Union[InterpolatedString, str]]],
194    ) -> Union[Mapping[str, Any], List[Mapping[str, Any]]]:
195        """
196        Determines the JSON Schema type for a field, supporting nullable and combined types.
197        """
198        raw_field_type = (
199            self._extract_data(raw_schema, field_type_path, default="string")
200            if field_type_path
201            else "string"
202        )
203        mapped_field_type = self._replace_type_if_not_valid(raw_field_type, raw_schema)
204        if (
205            isinstance(mapped_field_type, list)
206            and len(mapped_field_type) == 2
207            and all(isinstance(item, str) for item in mapped_field_type)
208        ):
209            first_type = self._get_airbyte_type(mapped_field_type[0])
210            second_type = self._get_airbyte_type(mapped_field_type[1])
211            return {"oneOf": [first_type, second_type]}
212
213        elif isinstance(mapped_field_type, str):
214            return self._get_airbyte_type(mapped_field_type)
215
216        elif isinstance(mapped_field_type, ComplexFieldType):
217            return self._resolve_complex_type(mapped_field_type)
218
219        else:
220            raise ValueError(
221                f"Invalid data type. Available string or two items list of string. Got {mapped_field_type}."
222            )
223
224    def _resolve_complex_type(self, complex_type: ComplexFieldType) -> Mapping[str, Any]:
225        if not complex_type.items:
226            return self._get_airbyte_type(complex_type.field_type)
227
228        field_type = self._get_airbyte_type(complex_type.field_type)
229
230        field_type["items"] = (
231            self._get_airbyte_type(complex_type.items)
232            if isinstance(complex_type.items, str)
233            else self._resolve_complex_type(complex_type.items)
234        )
235
236        return field_type
237
238    def _replace_type_if_not_valid(
239        self,
240        field_type: Union[List[str], str],
241        raw_schema: MutableMapping[str, Any],
242    ) -> Union[List[str], str, ComplexFieldType]:
243        """
244        Replaces a field type if it matches a type mapping in `types_map`.
245        """
246        if self.schema_type_identifier.types_mapping:
247            for types_map in self.schema_type_identifier.types_mapping:
248                # conditional is optional param, setting to true if not provided
249                condition = InterpolatedBoolean(
250                    condition=types_map.condition if types_map.condition is not None else "True",
251                    parameters={},
252                ).eval(config=self.config, raw_schema=raw_schema)
253
254                if field_type == types_map.current_type and condition:
255                    return types_map.target_type
256        return field_type
257
258    @staticmethod
259    def _get_airbyte_type(field_type: str) -> MutableMapping[str, Any]:
260        """
261        Maps a field type to its corresponding Airbyte type definition.
262        """
263        if field_type not in AIRBYTE_DATA_TYPES:
264            raise ValueError(f"Invalid Airbyte data type: {field_type}")
265
266        return deepcopy(AIRBYTE_DATA_TYPES[field_type])
267
268    def _extract_data(
269        self,
270        body: Mapping[str, Any],
271        extraction_path: Optional[List[Union[InterpolatedString, str]]] = None,
272        default: Any = None,
273    ) -> Any:
274        """
275        Extracts data from the body based on the provided extraction path.
276        """
277
278        if not extraction_path:
279            return body
280
281        path = [
282            node.eval(self.config) if not isinstance(node, str) else node
283            for node in extraction_path
284        ]
285
286        return dpath.get(body, path, default=default)  # type: ignore # extracted will be a MutableMapping, given input data structure

Dynamically loads a JSON Schema by extracting data from retrieved records.

DynamicSchemaLoader( retriever: airbyte_cdk.sources.declarative.retrievers.Retriever, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], schema_type_identifier: SchemaTypeIdentifier, schema_transformations: List[airbyte_cdk.RecordTransformation] = <factory>)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
schema_type_identifier: SchemaTypeIdentifier
schema_transformations: List[airbyte_cdk.RecordTransformation]
def get_json_schema(self) -> Mapping[str, Any]:
131    def get_json_schema(self) -> Mapping[str, Any]:
132        """
133        Constructs a JSON Schema based on retrieved data.
134        """
135        properties = {}
136        retrieved_record = next(self.retriever.read_records({}), None)  # type: ignore[call-overload] # read_records return Iterable data type
137
138        raw_schema = (
139            self._extract_data(
140                retrieved_record,  # type: ignore[arg-type] # Expected that retrieved_record will be only Mapping[str, Any]
141                self.schema_type_identifier.schema_pointer,
142            )
143            if retrieved_record
144            else []
145        )
146
147        for property_definition in raw_schema:
148            key = self._get_key(property_definition, self.schema_type_identifier.key_pointer)
149            value = self._get_type(
150                property_definition,
151                self.schema_type_identifier.type_pointer,
152            )
153            properties[key] = value
154
155        transformed_properties = self._transform(properties, {})
156
157        return {
158            "$schema": "https://json-schema.org/draft-07/schema#",
159            "type": "object",
160            "additionalProperties": True,
161            "properties": transformed_properties,
162        }

Constructs a JSON Schema based on retrieved data.

@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass(frozen=True)
class ComplexFieldType:
49@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
50@dataclass(frozen=True)
51class ComplexFieldType:
52    """
53    Identifies complex field type
54    """
55
56    field_type: str
57    items: Optional[Union[str, "ComplexFieldType"]] = None
58
59    def __post_init__(self) -> None:
60        """
61        Enforces that `items` is only used when `field_type` is a array
62        """
63        # `items_type` is valid only for array target types
64        if self.items and self.field_type != "array":
65            raise ValueError("'items' can only be used when 'field_type' is an array.")

Identifies complex field type

ComplexFieldType( field_type: str, items: Union[str, ComplexFieldType, NoneType] = None)
field_type: str
items: Union[str, ComplexFieldType, NoneType] = None
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass(frozen=True)
class TypesMap:
68@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
69@dataclass(frozen=True)
70class TypesMap:
71    """
72    Represents a mapping between a current type and its corresponding target type.
73    """
74
75    target_type: Union[List[str], str, ComplexFieldType]
76    current_type: Union[List[str], str]
77    condition: Optional[str]

Represents a mapping between a current type and its corresponding target type.

TypesMap( target_type: Union[List[str], str, ComplexFieldType], current_type: Union[List[str], str], condition: Optional[str])
target_type: Union[List[str], str, ComplexFieldType]
current_type: Union[List[str], str]
condition: Optional[str]
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class SchemaTypeIdentifier:
 80@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
 81@dataclass
 82class SchemaTypeIdentifier:
 83    """
 84    Identifies schema details for dynamic schema extraction and processing.
 85    """
 86
 87    key_pointer: List[Union[InterpolatedString, str]]
 88    parameters: InitVar[Mapping[str, Any]]
 89    type_pointer: Optional[List[Union[InterpolatedString, str]]] = None
 90    types_mapping: Optional[List[TypesMap]] = None
 91    schema_pointer: Optional[List[Union[InterpolatedString, str]]] = None
 92
 93    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 94        self.schema_pointer = (
 95            self._update_pointer(self.schema_pointer, parameters) if self.schema_pointer else []
 96        )  # type: ignore[assignment]  # This is reqired field in model
 97        self.key_pointer = self._update_pointer(self.key_pointer, parameters)  # type: ignore[assignment]  # This is reqired field in model
 98        self.type_pointer = (
 99            self._update_pointer(self.type_pointer, parameters) if self.type_pointer else None
100        )
101
102    @staticmethod
103    def _update_pointer(
104        pointer: Optional[List[Union[InterpolatedString, str]]], parameters: Mapping[str, Any]
105    ) -> Optional[List[Union[InterpolatedString, str]]]:
106        return (
107            [
108                InterpolatedString.create(path, parameters=parameters)
109                if isinstance(path, str)
110                else path
111                for path in pointer
112            ]
113            if pointer
114            else None
115        )

Identifies schema details for dynamic schema extraction and processing.

SchemaTypeIdentifier( key_pointer: List[Union[airbyte_cdk.InterpolatedString, str]], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], type_pointer: Optional[List[Union[airbyte_cdk.InterpolatedString, str]]] = None, types_mapping: Optional[List[TypesMap]] = None, schema_pointer: Optional[List[Union[airbyte_cdk.InterpolatedString, str]]] = None)
key_pointer: List[Union[airbyte_cdk.InterpolatedString, str]]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
type_pointer: Optional[List[Union[airbyte_cdk.InterpolatedString, str]]] = None
types_mapping: Optional[List[TypesMap]] = None
schema_pointer: Optional[List[Union[airbyte_cdk.InterpolatedString, str]]] = None