airbyte_cdk.sources.declarative.schema
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.schema.default_schema_loader import DefaultSchemaLoader 6from airbyte_cdk.sources.declarative.schema.dynamic_schema_loader import ( 7 ComplexFieldType, 8 DynamicSchemaLoader, 9 SchemaTypeIdentifier, 10 TypesMap, 11) 12from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader 13from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader 14from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader 15 16__all__ = [ 17 "JsonFileSchemaLoader", 18 "DefaultSchemaLoader", 19 "SchemaLoader", 20 "InlineSchemaLoader", 21 "DynamicSchemaLoader", 22 "ComplexFieldType", 23 "TypesMap", 24 "SchemaTypeIdentifier", 25]
33@dataclass 34class JsonFileSchemaLoader(ResourceSchemaLoader, SchemaLoader): 35 """ 36 Loads the schema from a json file 37 38 Attributes: 39 file_path (Union[InterpolatedString, str]): The path to the json file describing the schema 40 name (str): The stream's name 41 config (Config): The user-provided configuration as specified by the source's spec 42 parameters (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed 43 """ 44 45 config: Config 46 parameters: InitVar[Mapping[str, Any]] 47 file_path: Union[InterpolatedString, str] = field(default="") 48 49 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 50 if not self.file_path: 51 self.file_path = _default_file_path() 52 self.file_path = InterpolatedString.create(self.file_path, parameters=parameters) 53 54 def get_json_schema(self) -> Mapping[str, Any]: 55 # todo: It is worth revisiting if we can replace file_path with just file_name if every schema is in the /schemas directory 56 # this would require that we find a creative solution to store or retrieve source_name in here since the files are mounted there 57 json_schema_path = self._get_json_filepath() 58 resource, schema_path = self.extract_resource_and_schema_path(json_schema_path) 59 raw_json_file = pkgutil.get_data(resource, schema_path) 60 61 if not raw_json_file: 62 raise IOError(f"Cannot find file {json_schema_path}") 63 try: 64 raw_schema = json.loads(raw_json_file) 65 except ValueError as err: 66 raise RuntimeError(f"Invalid JSON file format for file {json_schema_path}") from err 67 self.package_name = resource 68 return self._resolve_schema_references(raw_schema) 69 70 def _get_json_filepath(self) -> Any: 71 return self.file_path.eval(self.config) # type: ignore # file_path is always cast to an interpolated string 72 73 @staticmethod 74 def extract_resource_and_schema_path(json_schema_path: str) -> Tuple[str, str]: 75 """ 76 When the connector is running on a docker container, package_data is accessible from the resource (source_<name>), so we extract 77 the resource from the first part of the schema path and the remaining path is used to find the schema file. This is a slight 78 hack to identify the source name while we are in the airbyte_cdk module. 79 :param json_schema_path: The path to the schema JSON file 80 :return: Tuple of the resource name and the path to the schema file 81 """ 82 split_path = json_schema_path.split("/") 83 84 if split_path[0] == "" or split_path[0] == ".": 85 split_path = split_path[1:] 86 87 if len(split_path) == 0: 88 return "", "" 89 90 if len(split_path) == 1: 91 return "", split_path[0] 92 93 return split_path[0], "/".join(split_path[1:])
Loads the schema from a json file
Attributes:
- file_path (Union[InterpolatedString, str]): The path to the json file describing the schema
- name (str): The stream's name
- config (Config): The user-provided configuration as specified by the source's spec
- parameters (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed
54 def get_json_schema(self) -> Mapping[str, Any]: 55 # todo: It is worth revisiting if we can replace file_path with just file_name if every schema is in the /schemas directory 56 # this would require that we find a creative solution to store or retrieve source_name in here since the files are mounted there 57 json_schema_path = self._get_json_filepath() 58 resource, schema_path = self.extract_resource_and_schema_path(json_schema_path) 59 raw_json_file = pkgutil.get_data(resource, schema_path) 60 61 if not raw_json_file: 62 raise IOError(f"Cannot find file {json_schema_path}") 63 try: 64 raw_schema = json.loads(raw_json_file) 65 except ValueError as err: 66 raise RuntimeError(f"Invalid JSON file format for file {json_schema_path}") from err 67 self.package_name = resource 68 return self._resolve_schema_references(raw_schema)
Returns a mapping describing the stream's schema
73 @staticmethod 74 def extract_resource_and_schema_path(json_schema_path: str) -> Tuple[str, str]: 75 """ 76 When the connector is running on a docker container, package_data is accessible from the resource (source_<name>), so we extract 77 the resource from the first part of the schema path and the remaining path is used to find the schema file. This is a slight 78 hack to identify the source name while we are in the airbyte_cdk module. 79 :param json_schema_path: The path to the schema JSON file 80 :return: Tuple of the resource name and the path to the schema file 81 """ 82 split_path = json_schema_path.split("/") 83 84 if split_path[0] == "" or split_path[0] == ".": 85 split_path = split_path[1:] 86 87 if len(split_path) == 0: 88 return "", "" 89 90 if len(split_path) == 1: 91 return "", split_path[0] 92 93 return split_path[0], "/".join(split_path[1:])
When the connector is running on a docker container, package_data is accessible from the resource (source_
Parameters
- json_schema_path: The path to the schema JSON file
Returns
Tuple of the resource name and the path to the schema file
15@dataclass 16class DefaultSchemaLoader(SchemaLoader): 17 """ 18 Loads a schema from the default location or returns an empty schema for streams that have not defined their schema file yet. 19 20 Attributes: 21 config (Config): The user-provided configuration as specified by the source's spec 22 parameters (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed 23 """ 24 25 config: Config 26 parameters: InitVar[Mapping[str, Any]] 27 28 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 29 self._parameters = parameters 30 self.default_loader = JsonFileSchemaLoader(parameters=parameters, config=self.config) 31 32 def get_json_schema(self) -> Mapping[str, Any]: 33 """ 34 Attempts to retrieve a schema from the default filepath location or returns the empty schema if a schema cannot be found. 35 36 :return: The empty schema 37 """ 38 39 try: 40 return self.default_loader.get_json_schema() 41 except (OSError, ValueError): 42 # A slight hack since we don't directly have the stream name. However, when building the default filepath we assume the 43 # runtime options stores stream name 'name' so we'll do the same here 44 stream_name = self._parameters.get("name", "") 45 logging.info( 46 f"Could not find schema for stream {stream_name}, defaulting to the empty schema" 47 ) 48 return {}
Loads a schema from the default location or returns an empty schema for streams that have not defined their schema file yet.
Attributes:
- config (Config): The user-provided configuration as specified by the source's spec
- parameters (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed
32 def get_json_schema(self) -> Mapping[str, Any]: 33 """ 34 Attempts to retrieve a schema from the default filepath location or returns the empty schema if a schema cannot be found. 35 36 :return: The empty schema 37 """ 38 39 try: 40 return self.default_loader.get_json_schema() 41 except (OSError, ValueError): 42 # A slight hack since we don't directly have the stream name. However, when building the default filepath we assume the 43 # runtime options stores stream name 'name' so we'll do the same here 44 stream_name = self._parameters.get("name", "") 45 logging.info( 46 f"Could not find schema for stream {stream_name}, defaulting to the empty schema" 47 ) 48 return {}
Attempts to retrieve a schema from the default filepath location or returns the empty schema if a schema cannot be found.
Returns
The empty schema
11@dataclass 12class SchemaLoader: 13 """Describes a stream's schema""" 14 15 @abstractmethod 16 def get_json_schema(self) -> Mapping[str, Any]: 17 """Returns a mapping describing the stream's schema""" 18 pass
Describes a stream's schema
12@dataclass 13class InlineSchemaLoader(SchemaLoader): 14 """Describes a stream's schema""" 15 16 schema: Dict[str, Any] 17 parameters: InitVar[Mapping[str, Any]] 18 19 def get_json_schema(self) -> Mapping[str, Any]: 20 return self.schema
Describes a stream's schema
119@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 120@dataclass 121class DynamicSchemaLoader(SchemaLoader): 122 """ 123 Dynamically loads a JSON Schema by extracting data from retrieved records. 124 """ 125 126 retriever: Retriever 127 config: Config 128 parameters: InitVar[Mapping[str, Any]] 129 schema_type_identifier: SchemaTypeIdentifier 130 schema_transformations: List[RecordTransformation] = field(default_factory=lambda: []) 131 schema_filter: Optional[RecordFilter] = None 132 133 def get_json_schema(self) -> Mapping[str, Any]: 134 """ 135 Constructs a JSON Schema based on retrieved data. 136 """ 137 properties = {} 138 retrieved_record = next(self.retriever.read_records({}), None) # type: ignore[call-overload] # read_records return Iterable data type 139 140 raw_schema = ( 141 self._extract_data( 142 retrieved_record, # type: ignore[arg-type] # Expected that retrieved_record will be only Mapping[str, Any] 143 self.schema_type_identifier.schema_pointer, 144 ) 145 if retrieved_record 146 else [] 147 ) 148 149 for property_definition in raw_schema: 150 key = self._get_key(property_definition, self.schema_type_identifier.key_pointer) 151 value = self._get_type( 152 property_definition, 153 self.schema_type_identifier.type_pointer, 154 ) 155 properties[key] = value 156 157 filtered_transformed_properties = self._transform(self._filter(properties)) 158 159 return { 160 "$schema": "https://json-schema.org/draft-07/schema#", 161 "type": "object", 162 "additionalProperties": True, 163 "properties": filtered_transformed_properties, 164 } 165 166 def _transform( 167 self, 168 properties: Mapping[str, Any], 169 ) -> Mapping[str, Any]: 170 for transformation in self.schema_transformations: 171 transformation.transform( 172 properties, # type: ignore # properties has type Mapping[str, Any], but Dict[str, Any] expected 173 config=self.config, 174 ) 175 return properties 176 177 def _filter( 178 self, 179 properties: Mapping[str, Any], 180 ) -> Mapping[str, Any]: 181 if not self.schema_filter: 182 return properties 183 184 filtered_properties: MutableMapping[str, Any] = {} 185 for item in self.schema_filter.filter_records( 186 ({k: v} for k, v in properties.items()), 187 {}, 188 ): 189 filtered_properties.update(item) 190 return filtered_properties 191 192 def _get_key( 193 self, 194 raw_schema: MutableMapping[str, Any], 195 field_key_path: List[Union[InterpolatedString, str]], 196 ) -> str: 197 """ 198 Extracts the key field from the schema using the specified path. 199 """ 200 field_key = self._extract_data(raw_schema, field_key_path) 201 if not isinstance(field_key, str): 202 raise ValueError(f"Expected key to be a string. Got {field_key}") 203 return field_key 204 205 def _get_type( 206 self, 207 raw_schema: MutableMapping[str, Any], 208 field_type_path: Optional[List[Union[InterpolatedString, str]]], 209 ) -> Union[Mapping[str, Any], List[Mapping[str, Any]]]: 210 """ 211 Determines the JSON Schema type for a field, supporting nullable and combined types. 212 """ 213 raw_field_type = ( 214 self._extract_data(raw_schema, field_type_path, default="string") 215 if field_type_path 216 else "string" 217 ) 218 mapped_field_type = self._replace_type_if_not_valid(raw_field_type, raw_schema) 219 if ( 220 isinstance(mapped_field_type, list) 221 and len(mapped_field_type) == 2 222 and all(isinstance(item, str) for item in mapped_field_type) 223 ): 224 first_type = self._get_airbyte_type(mapped_field_type[0]) 225 second_type = self._get_airbyte_type(mapped_field_type[1]) 226 return {"oneOf": [first_type, second_type]} 227 228 elif isinstance(mapped_field_type, str): 229 return self._get_airbyte_type(mapped_field_type) 230 231 elif isinstance(mapped_field_type, ComplexFieldType): 232 return self._resolve_complex_type(mapped_field_type) 233 234 else: 235 raise ValueError( 236 f"Invalid data type. Available string or two items list of string. Got {mapped_field_type}." 237 ) 238 239 def _resolve_complex_type(self, complex_type: ComplexFieldType) -> Mapping[str, Any]: 240 if not complex_type.items: 241 return self._get_airbyte_type(complex_type.field_type) 242 243 field_type = self._get_airbyte_type(complex_type.field_type) 244 245 field_type["items"] = ( 246 self._get_airbyte_type(complex_type.items) 247 if isinstance(complex_type.items, str) 248 else self._resolve_complex_type(complex_type.items) 249 ) 250 251 return field_type 252 253 def _replace_type_if_not_valid( 254 self, 255 field_type: Union[List[str], str], 256 raw_schema: MutableMapping[str, Any], 257 ) -> Union[List[str], str, ComplexFieldType]: 258 """ 259 Replaces a field type if it matches a type mapping in `types_map`. 260 """ 261 if self.schema_type_identifier.types_mapping: 262 for types_map in self.schema_type_identifier.types_mapping: 263 # conditional is optional param, setting to true if not provided 264 condition = InterpolatedBoolean( 265 condition=types_map.condition if types_map.condition is not None else "True", 266 parameters={}, 267 ).eval(config=self.config, raw_schema=raw_schema) 268 269 if field_type == types_map.current_type and condition: 270 return types_map.target_type 271 return field_type 272 273 @staticmethod 274 def _get_airbyte_type(field_type: str) -> MutableMapping[str, Any]: 275 """ 276 Maps a field type to its corresponding Airbyte type definition. 277 """ 278 if field_type not in AIRBYTE_DATA_TYPES: 279 raise ValueError(f"Invalid Airbyte data type: {field_type}") 280 281 return deepcopy(AIRBYTE_DATA_TYPES[field_type]) 282 283 def _extract_data( 284 self, 285 body: Mapping[str, Any], 286 extraction_path: Optional[List[Union[InterpolatedString, str]]] = None, 287 default: Any = None, 288 ) -> Any: 289 """ 290 Extracts data from the body based on the provided extraction path. 291 """ 292 293 if not extraction_path: 294 return body 295 296 path = [ 297 node.eval(self.config) if not isinstance(node, str) else node 298 for node in extraction_path 299 ] 300 301 return dpath.get(body, path, default=default) # type: ignore # extracted will be a MutableMapping, given input data structure
Dynamically loads a JSON Schema by extracting data from retrieved records.
133 def get_json_schema(self) -> Mapping[str, Any]: 134 """ 135 Constructs a JSON Schema based on retrieved data. 136 """ 137 properties = {} 138 retrieved_record = next(self.retriever.read_records({}), None) # type: ignore[call-overload] # read_records return Iterable data type 139 140 raw_schema = ( 141 self._extract_data( 142 retrieved_record, # type: ignore[arg-type] # Expected that retrieved_record will be only Mapping[str, Any] 143 self.schema_type_identifier.schema_pointer, 144 ) 145 if retrieved_record 146 else [] 147 ) 148 149 for property_definition in raw_schema: 150 key = self._get_key(property_definition, self.schema_type_identifier.key_pointer) 151 value = self._get_type( 152 property_definition, 153 self.schema_type_identifier.type_pointer, 154 ) 155 properties[key] = value 156 157 filtered_transformed_properties = self._transform(self._filter(properties)) 158 159 return { 160 "$schema": "https://json-schema.org/draft-07/schema#", 161 "type": "object", 162 "additionalProperties": True, 163 "properties": filtered_transformed_properties, 164 }
Constructs a JSON Schema based on retrieved data.
50@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 51@dataclass(frozen=True) 52class ComplexFieldType: 53 """ 54 Identifies complex field type 55 """ 56 57 field_type: str 58 items: Optional[Union[str, "ComplexFieldType"]] = None 59 60 def __post_init__(self) -> None: 61 """ 62 Enforces that `items` is only used when `field_type` is a array 63 """ 64 # `items_type` is valid only for array target types 65 if self.items and self.field_type != "array": 66 raise ValueError("'items' can only be used when 'field_type' is an array.")
Identifies complex field type
69@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 70@dataclass(frozen=True) 71class TypesMap: 72 """ 73 Represents a mapping between a current type and its corresponding target type. 74 """ 75 76 target_type: Union[List[str], str, ComplexFieldType] 77 current_type: Union[List[str], str] 78 condition: Optional[str]
Represents a mapping between a current type and its corresponding target type.
81@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 82@dataclass 83class SchemaTypeIdentifier: 84 """ 85 Identifies schema details for dynamic schema extraction and processing. 86 """ 87 88 key_pointer: List[Union[InterpolatedString, str]] 89 parameters: InitVar[Mapping[str, Any]] 90 type_pointer: Optional[List[Union[InterpolatedString, str]]] = None 91 types_mapping: Optional[List[TypesMap]] = None 92 schema_pointer: Optional[List[Union[InterpolatedString, str]]] = None 93 94 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 95 self.schema_pointer = ( 96 self._update_pointer(self.schema_pointer, parameters) if self.schema_pointer else [] 97 ) # type: ignore[assignment] # This is reqired field in model 98 self.key_pointer = self._update_pointer(self.key_pointer, parameters) # type: ignore[assignment] # This is reqired field in model 99 self.type_pointer = ( 100 self._update_pointer(self.type_pointer, parameters) if self.type_pointer else None 101 ) 102 103 @staticmethod 104 def _update_pointer( 105 pointer: Optional[List[Union[InterpolatedString, str]]], parameters: Mapping[str, Any] 106 ) -> Optional[List[Union[InterpolatedString, str]]]: 107 return ( 108 [ 109 InterpolatedString.create(path, parameters=parameters) 110 if isinstance(path, str) 111 else path 112 for path in pointer 113 ] 114 if pointer 115 else None 116 )
Identifies schema details for dynamic schema extraction and processing.