airbyte_cdk.sources.declarative.schema
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.schema.default_schema_loader import DefaultSchemaLoader 6from airbyte_cdk.sources.declarative.schema.dynamic_schema_loader import ( 7 ComplexFieldType, 8 DynamicSchemaLoader, 9 SchemaTypeIdentifier, 10 TypesMap, 11) 12from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader 13from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader 14from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader 15 16__all__ = [ 17 "JsonFileSchemaLoader", 18 "DefaultSchemaLoader", 19 "SchemaLoader", 20 "InlineSchemaLoader", 21 "DynamicSchemaLoader", 22 "ComplexFieldType", 23 "TypesMap", 24 "SchemaTypeIdentifier", 25]
33@dataclass 34class JsonFileSchemaLoader(ResourceSchemaLoader, SchemaLoader): 35 """ 36 Loads the schema from a json file 37 38 Attributes: 39 file_path (Union[InterpolatedString, str]): The path to the json file describing the schema 40 name (str): The stream's name 41 config (Config): The user-provided configuration as specified by the source's spec 42 parameters (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed 43 """ 44 45 config: Config 46 parameters: InitVar[Mapping[str, Any]] 47 file_path: Union[InterpolatedString, str] = field(default="") 48 49 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 50 if not self.file_path: 51 self.file_path = _default_file_path() 52 self.file_path = InterpolatedString.create(self.file_path, parameters=parameters) 53 54 def get_json_schema(self) -> Mapping[str, Any]: 55 # todo: It is worth revisiting if we can replace file_path with just file_name if every schema is in the /schemas directory 56 # this would require that we find a creative solution to store or retrieve source_name in here since the files are mounted there 57 json_schema_path = self._get_json_filepath() 58 resource, schema_path = self.extract_resource_and_schema_path(json_schema_path) 59 raw_json_file = pkgutil.get_data(resource, schema_path) 60 61 if not raw_json_file: 62 raise IOError(f"Cannot find file {json_schema_path}") 63 try: 64 raw_schema = json.loads(raw_json_file) 65 except ValueError as err: 66 raise RuntimeError(f"Invalid JSON file format for file {json_schema_path}") from err 67 self.package_name = resource 68 return self._resolve_schema_references(raw_schema) 69 70 def _get_json_filepath(self) -> Any: 71 return self.file_path.eval(self.config) # type: ignore # file_path is always cast to an interpolated string 72 73 @staticmethod 74 def extract_resource_and_schema_path(json_schema_path: str) -> Tuple[str, str]: 75 """ 76 When the connector is running on a docker container, package_data is accessible from the resource (source_<name>), so we extract 77 the resource from the first part of the schema path and the remaining path is used to find the schema file. This is a slight 78 hack to identify the source name while we are in the airbyte_cdk module. 79 :param json_schema_path: The path to the schema JSON file 80 :return: Tuple of the resource name and the path to the schema file 81 """ 82 split_path = json_schema_path.split("/") 83 84 if split_path[0] == "" or split_path[0] == ".": 85 split_path = split_path[1:] 86 87 if len(split_path) == 0: 88 return "", "" 89 90 if len(split_path) == 1: 91 return "", split_path[0] 92 93 return split_path[0], "/".join(split_path[1:])
Loads the schema from a json file
Attributes:
- file_path (Union[InterpolatedString, str]): The path to the json file describing the schema
- name (str): The stream's name
- config (Config): The user-provided configuration as specified by the source's spec
- parameters (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed
54 def get_json_schema(self) -> Mapping[str, Any]: 55 # todo: It is worth revisiting if we can replace file_path with just file_name if every schema is in the /schemas directory 56 # this would require that we find a creative solution to store or retrieve source_name in here since the files are mounted there 57 json_schema_path = self._get_json_filepath() 58 resource, schema_path = self.extract_resource_and_schema_path(json_schema_path) 59 raw_json_file = pkgutil.get_data(resource, schema_path) 60 61 if not raw_json_file: 62 raise IOError(f"Cannot find file {json_schema_path}") 63 try: 64 raw_schema = json.loads(raw_json_file) 65 except ValueError as err: 66 raise RuntimeError(f"Invalid JSON file format for file {json_schema_path}") from err 67 self.package_name = resource 68 return self._resolve_schema_references(raw_schema)
Returns a mapping describing the stream's schema
73 @staticmethod 74 def extract_resource_and_schema_path(json_schema_path: str) -> Tuple[str, str]: 75 """ 76 When the connector is running on a docker container, package_data is accessible from the resource (source_<name>), so we extract 77 the resource from the first part of the schema path and the remaining path is used to find the schema file. This is a slight 78 hack to identify the source name while we are in the airbyte_cdk module. 79 :param json_schema_path: The path to the schema JSON file 80 :return: Tuple of the resource name and the path to the schema file 81 """ 82 split_path = json_schema_path.split("/") 83 84 if split_path[0] == "" or split_path[0] == ".": 85 split_path = split_path[1:] 86 87 if len(split_path) == 0: 88 return "", "" 89 90 if len(split_path) == 1: 91 return "", split_path[0] 92 93 return split_path[0], "/".join(split_path[1:])
When the connector is running on a docker container, package_data is accessible from the resource (source_
Parameters
- json_schema_path: The path to the schema JSON file
Returns
Tuple of the resource name and the path to the schema file
15@dataclass 16class DefaultSchemaLoader(SchemaLoader): 17 """ 18 Loads a schema from the default location or returns an empty schema for streams that have not defined their schema file yet. 19 20 Attributes: 21 config (Config): The user-provided configuration as specified by the source's spec 22 parameters (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed 23 """ 24 25 config: Config 26 parameters: InitVar[Mapping[str, Any]] 27 28 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 29 self._parameters = parameters 30 self.default_loader = JsonFileSchemaLoader(parameters=parameters, config=self.config) 31 32 def get_json_schema(self) -> Mapping[str, Any]: 33 """ 34 Attempts to retrieve a schema from the default filepath location or returns the empty schema if a schema cannot be found. 35 36 :return: The empty schema 37 """ 38 39 try: 40 return self.default_loader.get_json_schema() 41 except OSError: 42 # A slight hack since we don't directly have the stream name. However, when building the default filepath we assume the 43 # runtime options stores stream name 'name' so we'll do the same here 44 stream_name = self._parameters.get("name", "") 45 logging.info( 46 f"Could not find schema for stream {stream_name}, defaulting to the empty schema" 47 ) 48 return {}
Loads a schema from the default location or returns an empty schema for streams that have not defined their schema file yet.
Attributes:
- config (Config): The user-provided configuration as specified by the source's spec
- parameters (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed
32 def get_json_schema(self) -> Mapping[str, Any]: 33 """ 34 Attempts to retrieve a schema from the default filepath location or returns the empty schema if a schema cannot be found. 35 36 :return: The empty schema 37 """ 38 39 try: 40 return self.default_loader.get_json_schema() 41 except OSError: 42 # A slight hack since we don't directly have the stream name. However, when building the default filepath we assume the 43 # runtime options stores stream name 'name' so we'll do the same here 44 stream_name = self._parameters.get("name", "") 45 logging.info( 46 f"Could not find schema for stream {stream_name}, defaulting to the empty schema" 47 ) 48 return {}
Attempts to retrieve a schema from the default filepath location or returns the empty schema if a schema cannot be found.
Returns
The empty schema
11@dataclass 12class SchemaLoader: 13 """Describes a stream's schema""" 14 15 @abstractmethod 16 def get_json_schema(self) -> Mapping[str, Any]: 17 """Returns a mapping describing the stream's schema""" 18 pass
Describes a stream's schema
12@dataclass 13class InlineSchemaLoader(SchemaLoader): 14 """Describes a stream's schema""" 15 16 schema: Dict[str, Any] 17 parameters: InitVar[Mapping[str, Any]] 18 19 def get_json_schema(self) -> Mapping[str, Any]: 20 return self.schema
Describes a stream's schema
118@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 119@dataclass 120class DynamicSchemaLoader(SchemaLoader): 121 """ 122 Dynamically loads a JSON Schema by extracting data from retrieved records. 123 """ 124 125 retriever: Retriever 126 config: Config 127 parameters: InitVar[Mapping[str, Any]] 128 schema_type_identifier: SchemaTypeIdentifier 129 schema_transformations: List[RecordTransformation] = field(default_factory=lambda: []) 130 131 def get_json_schema(self) -> Mapping[str, Any]: 132 """ 133 Constructs a JSON Schema based on retrieved data. 134 """ 135 properties = {} 136 retrieved_record = next(self.retriever.read_records({}), None) # type: ignore[call-overload] # read_records return Iterable data type 137 138 raw_schema = ( 139 self._extract_data( 140 retrieved_record, # type: ignore[arg-type] # Expected that retrieved_record will be only Mapping[str, Any] 141 self.schema_type_identifier.schema_pointer, 142 ) 143 if retrieved_record 144 else [] 145 ) 146 147 for property_definition in raw_schema: 148 key = self._get_key(property_definition, self.schema_type_identifier.key_pointer) 149 value = self._get_type( 150 property_definition, 151 self.schema_type_identifier.type_pointer, 152 ) 153 properties[key] = value 154 155 transformed_properties = self._transform(properties, {}) 156 157 return { 158 "$schema": "https://json-schema.org/draft-07/schema#", 159 "type": "object", 160 "additionalProperties": True, 161 "properties": transformed_properties, 162 } 163 164 def _transform( 165 self, 166 properties: Mapping[str, Any], 167 stream_state: StreamState, 168 stream_slice: Optional[StreamSlice] = None, 169 ) -> Mapping[str, Any]: 170 for transformation in self.schema_transformations: 171 transformation.transform( 172 properties, # type: ignore # properties has type Mapping[str, Any], but Dict[str, Any] expected 173 config=self.config, 174 ) 175 return properties 176 177 def _get_key( 178 self, 179 raw_schema: MutableMapping[str, Any], 180 field_key_path: List[Union[InterpolatedString, str]], 181 ) -> str: 182 """ 183 Extracts the key field from the schema using the specified path. 184 """ 185 field_key = self._extract_data(raw_schema, field_key_path) 186 if not isinstance(field_key, str): 187 raise ValueError(f"Expected key to be a string. Got {field_key}") 188 return field_key 189 190 def _get_type( 191 self, 192 raw_schema: MutableMapping[str, Any], 193 field_type_path: Optional[List[Union[InterpolatedString, str]]], 194 ) -> Union[Mapping[str, Any], List[Mapping[str, Any]]]: 195 """ 196 Determines the JSON Schema type for a field, supporting nullable and combined types. 197 """ 198 raw_field_type = ( 199 self._extract_data(raw_schema, field_type_path, default="string") 200 if field_type_path 201 else "string" 202 ) 203 mapped_field_type = self._replace_type_if_not_valid(raw_field_type, raw_schema) 204 if ( 205 isinstance(mapped_field_type, list) 206 and len(mapped_field_type) == 2 207 and all(isinstance(item, str) for item in mapped_field_type) 208 ): 209 first_type = self._get_airbyte_type(mapped_field_type[0]) 210 second_type = self._get_airbyte_type(mapped_field_type[1]) 211 return {"oneOf": [first_type, second_type]} 212 213 elif isinstance(mapped_field_type, str): 214 return self._get_airbyte_type(mapped_field_type) 215 216 elif isinstance(mapped_field_type, ComplexFieldType): 217 return self._resolve_complex_type(mapped_field_type) 218 219 else: 220 raise ValueError( 221 f"Invalid data type. Available string or two items list of string. Got {mapped_field_type}." 222 ) 223 224 def _resolve_complex_type(self, complex_type: ComplexFieldType) -> Mapping[str, Any]: 225 if not complex_type.items: 226 return self._get_airbyte_type(complex_type.field_type) 227 228 field_type = self._get_airbyte_type(complex_type.field_type) 229 230 field_type["items"] = ( 231 self._get_airbyte_type(complex_type.items) 232 if isinstance(complex_type.items, str) 233 else self._resolve_complex_type(complex_type.items) 234 ) 235 236 return field_type 237 238 def _replace_type_if_not_valid( 239 self, 240 field_type: Union[List[str], str], 241 raw_schema: MutableMapping[str, Any], 242 ) -> Union[List[str], str, ComplexFieldType]: 243 """ 244 Replaces a field type if it matches a type mapping in `types_map`. 245 """ 246 if self.schema_type_identifier.types_mapping: 247 for types_map in self.schema_type_identifier.types_mapping: 248 # conditional is optional param, setting to true if not provided 249 condition = InterpolatedBoolean( 250 condition=types_map.condition if types_map.condition is not None else "True", 251 parameters={}, 252 ).eval(config=self.config, raw_schema=raw_schema) 253 254 if field_type == types_map.current_type and condition: 255 return types_map.target_type 256 return field_type 257 258 @staticmethod 259 def _get_airbyte_type(field_type: str) -> MutableMapping[str, Any]: 260 """ 261 Maps a field type to its corresponding Airbyte type definition. 262 """ 263 if field_type not in AIRBYTE_DATA_TYPES: 264 raise ValueError(f"Invalid Airbyte data type: {field_type}") 265 266 return deepcopy(AIRBYTE_DATA_TYPES[field_type]) 267 268 def _extract_data( 269 self, 270 body: Mapping[str, Any], 271 extraction_path: Optional[List[Union[InterpolatedString, str]]] = None, 272 default: Any = None, 273 ) -> Any: 274 """ 275 Extracts data from the body based on the provided extraction path. 276 """ 277 278 if not extraction_path: 279 return body 280 281 path = [ 282 node.eval(self.config) if not isinstance(node, str) else node 283 for node in extraction_path 284 ] 285 286 return dpath.get(body, path, default=default) # type: ignore # extracted will be a MutableMapping, given input data structure
Dynamically loads a JSON Schema by extracting data from retrieved records.
131 def get_json_schema(self) -> Mapping[str, Any]: 132 """ 133 Constructs a JSON Schema based on retrieved data. 134 """ 135 properties = {} 136 retrieved_record = next(self.retriever.read_records({}), None) # type: ignore[call-overload] # read_records return Iterable data type 137 138 raw_schema = ( 139 self._extract_data( 140 retrieved_record, # type: ignore[arg-type] # Expected that retrieved_record will be only Mapping[str, Any] 141 self.schema_type_identifier.schema_pointer, 142 ) 143 if retrieved_record 144 else [] 145 ) 146 147 for property_definition in raw_schema: 148 key = self._get_key(property_definition, self.schema_type_identifier.key_pointer) 149 value = self._get_type( 150 property_definition, 151 self.schema_type_identifier.type_pointer, 152 ) 153 properties[key] = value 154 155 transformed_properties = self._transform(properties, {}) 156 157 return { 158 "$schema": "https://json-schema.org/draft-07/schema#", 159 "type": "object", 160 "additionalProperties": True, 161 "properties": transformed_properties, 162 }
Constructs a JSON Schema based on retrieved data.
49@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 50@dataclass(frozen=True) 51class ComplexFieldType: 52 """ 53 Identifies complex field type 54 """ 55 56 field_type: str 57 items: Optional[Union[str, "ComplexFieldType"]] = None 58 59 def __post_init__(self) -> None: 60 """ 61 Enforces that `items` is only used when `field_type` is a array 62 """ 63 # `items_type` is valid only for array target types 64 if self.items and self.field_type != "array": 65 raise ValueError("'items' can only be used when 'field_type' is an array.")
Identifies complex field type
68@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 69@dataclass(frozen=True) 70class TypesMap: 71 """ 72 Represents a mapping between a current type and its corresponding target type. 73 """ 74 75 target_type: Union[List[str], str, ComplexFieldType] 76 current_type: Union[List[str], str] 77 condition: Optional[str]
Represents a mapping between a current type and its corresponding target type.
80@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 81@dataclass 82class SchemaTypeIdentifier: 83 """ 84 Identifies schema details for dynamic schema extraction and processing. 85 """ 86 87 key_pointer: List[Union[InterpolatedString, str]] 88 parameters: InitVar[Mapping[str, Any]] 89 type_pointer: Optional[List[Union[InterpolatedString, str]]] = None 90 types_mapping: Optional[List[TypesMap]] = None 91 schema_pointer: Optional[List[Union[InterpolatedString, str]]] = None 92 93 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 94 self.schema_pointer = ( 95 self._update_pointer(self.schema_pointer, parameters) if self.schema_pointer else [] 96 ) # type: ignore[assignment] # This is reqired field in model 97 self.key_pointer = self._update_pointer(self.key_pointer, parameters) # type: ignore[assignment] # This is reqired field in model 98 self.type_pointer = ( 99 self._update_pointer(self.type_pointer, parameters) if self.type_pointer else None 100 ) 101 102 @staticmethod 103 def _update_pointer( 104 pointer: Optional[List[Union[InterpolatedString, str]]], parameters: Mapping[str, Any] 105 ) -> Optional[List[Union[InterpolatedString, str]]]: 106 return ( 107 [ 108 InterpolatedString.create(path, parameters=parameters) 109 if isinstance(path, str) 110 else path 111 for path in pointer 112 ] 113 if pointer 114 else None 115 )
Identifies schema details for dynamic schema extraction and processing.