airbyte_cdk.sources.declarative.parsers.manifest_normalizer
1# 2# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 3# 4 5import copy 6import hashlib 7import json 8from collections import defaultdict 9from itertools import chain 10from typing import Any, Callable, DefaultDict, Dict, Iterable, List, Optional, Tuple 11 12from airbyte_cdk.sources.declarative.parsers.custom_exceptions import ManifestNormalizationException 13 14# Type definitions for better readability 15ManifestType = Dict[str, Any] 16DefinitionsType = Dict[str, Any] 17DuplicateOccurancesType = List[Tuple[List[str], Dict[str, Any], Dict[str, Any]]] 18DuplicatesType = DefaultDict[str, DuplicateOccurancesType] 19 20# Configuration constants 21N_OCCURANCES = 2 22 23DEF_TAG = "definitions" 24LINKABLE_TAG = "linkable" 25LINKED_TAG = "linked" 26PROPERTIES_TAG = "properties" 27SCHEMA_LOADER_TAG = "schema_loader" 28SCHEMA_TAG = "schema" 29SCHEMAS_TAG = "schemas" 30STREAMS_TAG = "streams" 31 32 33def _get_linkable_schema_tags(schema: DefinitionsType) -> List[str]: 34 """ 35 Extracts linkable tags from schema definitions. 36 This function identifies properties within a schema's definitions that are marked as linkable. 37 It traverses through each definition in the schema, examines its properties, and collects 38 the keys of properties that contain the LINKABLE_TAG. 39 40 Args: 41 schema (DefinitionsType): The schema definition dictionary to process 42 43 Returns: 44 List[str]: A deduplicated list of property keys that are marked as linkable 45 """ 46 47 # the linkable scope: ['definitions.*'] 48 schema_definitions = schema.get(DEF_TAG, {}) 49 50 linkable_tags: List[str] = [] 51 # Extract linkable keys from properties 52 53 extract_linkable_keys: Callable[[Dict[str, Dict[str, Any]]], List[str]] = lambda properties: [ 54 key for key, value in properties.items() if LINKABLE_TAG in value.keys() 55 ] 56 57 # Process each root value to get its linkable keys 58 process_root: Callable[[Dict[str, Any]], List[str]] = lambda root_value: extract_linkable_keys( 59 root_value.get(PROPERTIES_TAG, {}) 60 ) 61 62 # Map the process_root function over all schema values and flatten the results 63 all_linkable_tags = chain.from_iterable(map(process_root, schema_definitions.values())) 64 65 # Add all found linkable tags to the tags list 66 linkable_tags.extend(all_linkable_tags) 67 68 # return unique tags only 69 return list(set(linkable_tags)) 70 71 72class ManifestNormalizer: 73 """ 74 This class is responsible for normalizing the manifest by appliying processing such as: 75 - removing duplicated definitions 76 - replacing them with references. 77 78 To extend the functionality, use the `normilize()` method to include any additional processing steps. 79 """ 80 81 def __init__( 82 self, 83 resolved_manifest: ManifestType, 84 declarative_schema: DefinitionsType, 85 ) -> None: 86 self._resolved_manifest = resolved_manifest 87 self._declarative_schema = declarative_schema 88 self._normalized_manifest: ManifestType = copy.deepcopy(self._resolved_manifest) 89 # get the tags marked as `linkable` in the component schema 90 self._linkable_tags = _get_linkable_schema_tags(self._declarative_schema) 91 92 def to_json_str(self) -> str: 93 return json.dumps(self._normalized_manifest, indent=2) 94 95 def normalize(self) -> ManifestType: 96 """ 97 Normalizes the manifest by deduplicating and resolving schema references. 98 99 This method processes the manifest in two steps: 100 1. Deduplicates elements within the manifest 101 2. Resolves and references schemas 102 103 Returns: 104 ManifestType: The normalized manifest if processing succeeds, 105 or the original resolved manifest if normalization fails. 106 107 Raises: 108 ManifestNormalizationException: Caught internally and handled by returning the original manifest. 109 """ 110 try: 111 self._deduplicate_minifest() 112 self._reference_schemas() 113 114 return self._normalized_manifest 115 except ManifestNormalizationException: 116 # if any error occurs, we just return the original manifest. 117 # TODO: enable debug logging 118 return self._resolved_manifest 119 120 def _get_manifest_streams(self) -> Iterable[Dict[str, Any]]: 121 """ 122 Get the streams from the manifest. 123 124 Returns: 125 An Iterable of streams. 126 """ 127 128 if STREAMS_TAG in self._normalized_manifest.keys(): 129 for stream in self._normalized_manifest[STREAMS_TAG]: 130 yield stream 131 132 yield from [] 133 134 def _deduplicate_minifest(self) -> None: 135 """ 136 Find commonalities in the input JSON structure and refactor it to avoid redundancy. 137 """ 138 139 try: 140 # prepare the `definitions` tag 141 self._prepare_definitions() 142 # replace duplicates with references, if any 143 self._handle_duplicates(self._collect_duplicates()) 144 except Exception as e: 145 raise ManifestNormalizationException(str(e)) 146 147 def _prepare_definitions(self) -> None: 148 """ 149 Clean the definitions in the manifest by removing unnecessary properties. 150 This function modifies the manifest in place. 151 """ 152 153 # Check if the definitions tag exists 154 if not DEF_TAG in self._normalized_manifest: 155 self._normalized_manifest[DEF_TAG] = {} 156 157 # Check if the linked tag exists 158 if not LINKED_TAG in self._normalized_manifest[DEF_TAG]: 159 self._normalized_manifest[DEF_TAG][LINKED_TAG] = {} 160 161 # remove everything from definitions tag except of `linked`, after processing 162 for key in list(self._normalized_manifest[DEF_TAG].keys()): 163 if key != LINKED_TAG: 164 self._normalized_manifest[DEF_TAG].pop(key, None) 165 166 def _extract_stream_schema(self, stream: Dict[str, Any]) -> None: 167 """ 168 Extract the schema from the stream and add it to the `schemas` tag. 169 """ 170 171 stream_name = stream["name"] 172 # copy the value of the SCHEMA_TAG to the SCHEMAS_TAG with the stream name as key 173 schema = stream.get(SCHEMA_LOADER_TAG, {}).get(SCHEMA_TAG) 174 if not SCHEMAS_TAG in self._normalized_manifest.keys(): 175 self._normalized_manifest[SCHEMAS_TAG] = {} 176 # add stream schema to the SCHEMAS_TAG 177 if not stream_name in self._normalized_manifest[SCHEMAS_TAG].keys(): 178 # add the schema to the SCHEMAS_TAG with the stream name as key 179 self._normalized_manifest[SCHEMAS_TAG][stream_name] = schema 180 181 def _reference_schemas(self) -> None: 182 """ 183 Set the schema reference for the given stream in the manifest. 184 This function modifies the manifest in place. 185 """ 186 187 # reference the stream schema for the stream to where it's stored 188 if SCHEMAS_TAG in self._normalized_manifest.keys(): 189 for stream in self._get_manifest_streams(): 190 self._extract_stream_schema(stream) 191 self._set_stream_schema_ref(stream) 192 193 def _set_stream_schema_ref(self, stream: Dict[str, Any]) -> None: 194 """ 195 Set the schema reference for the given stream in the manifest. 196 This function modifies the manifest in place. 197 """ 198 stream_name = stream["name"] 199 if SCHEMAS_TAG in self._normalized_manifest.keys(): 200 if stream_name in self._normalized_manifest[SCHEMAS_TAG]: 201 stream[SCHEMA_LOADER_TAG][SCHEMA_TAG] = self._create_schema_ref(stream_name) 202 203 def _replace_duplicates_with_refs(self, duplicates: DuplicatesType) -> None: 204 """ 205 Process duplicate objects and replace them with references. 206 207 Args: 208 duplicates: The duplicates dictionary collected from the given manifest. 209 """ 210 211 for _, occurrences in duplicates.items(): 212 type_key, key, value = self._get_occurance_samples(occurrences) 213 is_linked_def = self._is_linked_definition(type_key, key) 214 215 # Add to definitions if not there already 216 if not is_linked_def: 217 self._add_to_linked_definitions(type_key, key, value) 218 219 # Replace occurrences with references 220 for _, parent_obj, value in occurrences: 221 if is_linked_def: 222 if value == self._get_linked_definition_value(type_key, key): 223 parent_obj[key] = self._create_linked_definition_ref(type_key, key) 224 else: 225 parent_obj[key] = self._create_linked_definition_ref(type_key, key) 226 227 def _handle_duplicates(self, duplicates: DuplicatesType) -> None: 228 """ 229 Process the duplicates and replace them with references. 230 231 Args: 232 duplicates: The duplicates dictionary collected from the given manifest. 233 """ 234 235 if len(duplicates) > 0: 236 self._replace_duplicates_with_refs(duplicates) 237 238 def _add_duplicate( 239 self, 240 duplicates: DuplicatesType, 241 current_path: List[str], 242 obj: Dict[str, Any], 243 value: Any, 244 key: Optional[str] = None, 245 ) -> None: 246 """ 247 Adds a duplicate record of an observed object by computing a unique hash for the provided value. 248 249 This function computes a hash for the given value (or a dictionary composed of the key and value if a key is provided) 250 and appends a tuple containing the current path, the original object, and the value to the duplicates 251 dictionary under the corresponding hash. 252 253 Parameters: 254 duplicates (DuplicatesType): The dictionary to store duplicate records. 255 current_path (List[str]): The list of keys or indices representing the current location in the object hierarchy. 256 obj (Dict): The original dictionary object where the duplicate is observed. 257 value (Any): The value to be hashed and used for identifying duplicates. 258 key (Optional[str]): An optional key that, if provided, wraps the value in a dictionary before hashing. 259 """ 260 261 # create hash for each duplicate observed 262 value_to_hash = {key: value} if key is not None else value 263 duplicates[self._hash_object(value_to_hash)].append((current_path, obj, value)) 264 265 def _add_to_linked_definitions( 266 self, 267 type_key: str, 268 key: str, 269 value: Any, 270 ) -> None: 271 """ 272 Add a value to the linked definitions under the specified key. 273 274 Args: 275 definitions: The definitions dictionary to modify 276 key: The key to use 277 value: The value to add 278 """ 279 if type_key not in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys(): 280 self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key] = {} 281 282 if key not in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys(): 283 self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key][key] = value 284 285 def _collect_duplicates(self) -> DuplicatesType: 286 """ 287 Traverse the JSON object and collect all potential duplicate values and objects. 288 289 Returns: 290 duplicates: A dictionary of duplicate objects. 291 """ 292 293 def _collect(obj: Dict[str, Any], path: Optional[List[str]] = None) -> None: 294 """ 295 The closure to recursively collect duplicates in the JSON object. 296 297 Args: 298 obj: The current object being analyzed. 299 path: The current path in the object hierarchy. 300 """ 301 302 if not isinstance(obj, dict): 303 return 304 305 path = [] if path is None else path 306 # Check if the object is empty 307 for key, value in obj.items(): 308 # do not collect duplicates from `definitions` tag 309 if key == DEF_TAG: 310 continue 311 312 current_path = path + [key] 313 314 if isinstance(value, dict): 315 # First process nested dictionaries 316 _collect(value, current_path) 317 # Process allowed-only component tags 318 if key in self._linkable_tags: 319 self._add_duplicate(duplicates, current_path, obj, value) 320 321 # handle primitive types 322 elif isinstance(value, (str, int, float, bool)): 323 # Process allowed-only field tags 324 if key in self._linkable_tags: 325 self._add_duplicate(duplicates, current_path, obj, value, key) 326 327 # handle list cases 328 elif isinstance(value, list): 329 for i, item in enumerate(value): 330 _collect(item, current_path + [str(i)]) 331 332 duplicates: DuplicatesType = defaultdict(list, {}) 333 try: 334 if self._linkable_tags: 335 _collect(self._normalized_manifest) 336 # clean non-duplicates and sort based on the count of occurrences 337 return self._clean_and_sort_duplicates(duplicates) 338 return duplicates 339 except Exception as e: 340 raise ManifestNormalizationException(str(e)) 341 342 def _clean_and_sort_duplicates(self, duplicates: DuplicatesType) -> DuplicatesType: 343 """ 344 Clean non-duplicates and sort the duplicates by their occurrences. 345 346 Args: 347 duplicates: The duplicates dictionary to sort 348 349 Returns: 350 A sorted duplicates dictionary. 351 """ 352 353 # clean non-duplicates 354 duplicates = defaultdict( 355 list, 356 {k: v for k, v in duplicates.items() if len(v) >= N_OCCURANCES}, 357 ) 358 359 # sort the duplicates by their occurrences, more frequent ones go first 360 duplicates = defaultdict( 361 list, 362 {k: v for k, v in sorted(duplicates.items(), key=lambda x: len(x[1]), reverse=True)}, 363 ) 364 365 return duplicates 366 367 def _hash_object(self, obj: Dict[str, Any]) -> str: 368 """ 369 Create a unique hash for a dictionary object. 370 371 Args: 372 node: The dictionary to hash 373 374 Returns: 375 A hashed string 376 """ 377 378 # Sort keys to ensure consistent hash for same content 379 return hashlib.md5(json.dumps(obj, sort_keys=True).encode()).hexdigest() 380 381 def _is_linked_definition(self, type_key: str, key: str) -> bool: 382 """ 383 Check if the key already exists in the linked definitions. 384 385 Args: 386 key: The key to check 387 definitions: The definitions dictionary with definitions 388 389 Returns: 390 True if the key exists in the linked definitions, False otherwise 391 """ 392 393 if type_key in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys(): 394 # Check if the key exists in the linked definitions 395 if key in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys(): 396 return True 397 398 return False 399 400 def _get_linked_definition_value(self, type_key: str, key: str) -> Any: 401 """ 402 Get the value of a linked definition by its key. 403 404 Args: 405 key: The key to check 406 definitions: The definitions dictionary with definitions 407 408 Returns: 409 The value of the linked definition 410 """ 411 if type_key in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys(): 412 if key in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys(): 413 return self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key][key] 414 else: 415 raise ManifestNormalizationException( 416 f"Key {key} not found in linked definitions. Please check the manifest." 417 ) 418 419 def _get_occurance_samples(self, occurrences: DuplicateOccurancesType) -> Tuple[str, str, Any]: 420 """ 421 Get the key from the occurrences list. 422 423 Args: 424 occurrences: The occurrences list 425 426 Returns: 427 The key, type and value from the occurrences 428 """ 429 430 # Take the value from the first occurrence, as they are the same 431 path, obj, value = occurrences[0] 432 return ( 433 obj["type"], 434 path[-1], 435 value, 436 ) # Return the component's name as the last part of its path 437 438 def _create_linked_definition_ref(self, type_key: str, key: str) -> Dict[str, str]: 439 """ 440 Create a reference object for the linked definitions using the specified key. 441 442 Args: 443 ref_key: The reference key to use 444 445 Returns: 446 A reference object in the proper format 447 """ 448 449 return {"$ref": f"#/{DEF_TAG}/{LINKED_TAG}/{type_key}/{key}"} 450 451 def _create_schema_ref(self, key: str) -> Dict[str, str]: 452 """ 453 Create a reference object for stream schema using the specified key. 454 455 Args: 456 key: The reference key to use 457 458 Returns: 459 A reference object in the proper format 460 """ 461 462 return {"$ref": f"#/{SCHEMAS_TAG}/{key}"}
ManifestType =
typing.Dict[str, typing.Any]
DefinitionsType =
typing.Dict[str, typing.Any]
DuplicateOccurancesType =
typing.List[typing.Tuple[typing.List[str], typing.Dict[str, typing.Any], typing.Dict[str, typing.Any]]]
DuplicatesType =
typing.DefaultDict[str, typing.List[typing.Tuple[typing.List[str], typing.Dict[str, typing.Any], typing.Dict[str, typing.Any]]]]
N_OCCURANCES =
2
DEF_TAG =
'definitions'
LINKABLE_TAG =
'linkable'
LINKED_TAG =
'linked'
PROPERTIES_TAG =
'properties'
SCHEMA_LOADER_TAG =
'schema_loader'
SCHEMA_TAG =
'schema'
SCHEMAS_TAG =
'schemas'
STREAMS_TAG =
'streams'
class
ManifestNormalizer:
73class ManifestNormalizer: 74 """ 75 This class is responsible for normalizing the manifest by appliying processing such as: 76 - removing duplicated definitions 77 - replacing them with references. 78 79 To extend the functionality, use the `normilize()` method to include any additional processing steps. 80 """ 81 82 def __init__( 83 self, 84 resolved_manifest: ManifestType, 85 declarative_schema: DefinitionsType, 86 ) -> None: 87 self._resolved_manifest = resolved_manifest 88 self._declarative_schema = declarative_schema 89 self._normalized_manifest: ManifestType = copy.deepcopy(self._resolved_manifest) 90 # get the tags marked as `linkable` in the component schema 91 self._linkable_tags = _get_linkable_schema_tags(self._declarative_schema) 92 93 def to_json_str(self) -> str: 94 return json.dumps(self._normalized_manifest, indent=2) 95 96 def normalize(self) -> ManifestType: 97 """ 98 Normalizes the manifest by deduplicating and resolving schema references. 99 100 This method processes the manifest in two steps: 101 1. Deduplicates elements within the manifest 102 2. Resolves and references schemas 103 104 Returns: 105 ManifestType: The normalized manifest if processing succeeds, 106 or the original resolved manifest if normalization fails. 107 108 Raises: 109 ManifestNormalizationException: Caught internally and handled by returning the original manifest. 110 """ 111 try: 112 self._deduplicate_minifest() 113 self._reference_schemas() 114 115 return self._normalized_manifest 116 except ManifestNormalizationException: 117 # if any error occurs, we just return the original manifest. 118 # TODO: enable debug logging 119 return self._resolved_manifest 120 121 def _get_manifest_streams(self) -> Iterable[Dict[str, Any]]: 122 """ 123 Get the streams from the manifest. 124 125 Returns: 126 An Iterable of streams. 127 """ 128 129 if STREAMS_TAG in self._normalized_manifest.keys(): 130 for stream in self._normalized_manifest[STREAMS_TAG]: 131 yield stream 132 133 yield from [] 134 135 def _deduplicate_minifest(self) -> None: 136 """ 137 Find commonalities in the input JSON structure and refactor it to avoid redundancy. 138 """ 139 140 try: 141 # prepare the `definitions` tag 142 self._prepare_definitions() 143 # replace duplicates with references, if any 144 self._handle_duplicates(self._collect_duplicates()) 145 except Exception as e: 146 raise ManifestNormalizationException(str(e)) 147 148 def _prepare_definitions(self) -> None: 149 """ 150 Clean the definitions in the manifest by removing unnecessary properties. 151 This function modifies the manifest in place. 152 """ 153 154 # Check if the definitions tag exists 155 if not DEF_TAG in self._normalized_manifest: 156 self._normalized_manifest[DEF_TAG] = {} 157 158 # Check if the linked tag exists 159 if not LINKED_TAG in self._normalized_manifest[DEF_TAG]: 160 self._normalized_manifest[DEF_TAG][LINKED_TAG] = {} 161 162 # remove everything from definitions tag except of `linked`, after processing 163 for key in list(self._normalized_manifest[DEF_TAG].keys()): 164 if key != LINKED_TAG: 165 self._normalized_manifest[DEF_TAG].pop(key, None) 166 167 def _extract_stream_schema(self, stream: Dict[str, Any]) -> None: 168 """ 169 Extract the schema from the stream and add it to the `schemas` tag. 170 """ 171 172 stream_name = stream["name"] 173 # copy the value of the SCHEMA_TAG to the SCHEMAS_TAG with the stream name as key 174 schema = stream.get(SCHEMA_LOADER_TAG, {}).get(SCHEMA_TAG) 175 if not SCHEMAS_TAG in self._normalized_manifest.keys(): 176 self._normalized_manifest[SCHEMAS_TAG] = {} 177 # add stream schema to the SCHEMAS_TAG 178 if not stream_name in self._normalized_manifest[SCHEMAS_TAG].keys(): 179 # add the schema to the SCHEMAS_TAG with the stream name as key 180 self._normalized_manifest[SCHEMAS_TAG][stream_name] = schema 181 182 def _reference_schemas(self) -> None: 183 """ 184 Set the schema reference for the given stream in the manifest. 185 This function modifies the manifest in place. 186 """ 187 188 # reference the stream schema for the stream to where it's stored 189 if SCHEMAS_TAG in self._normalized_manifest.keys(): 190 for stream in self._get_manifest_streams(): 191 self._extract_stream_schema(stream) 192 self._set_stream_schema_ref(stream) 193 194 def _set_stream_schema_ref(self, stream: Dict[str, Any]) -> None: 195 """ 196 Set the schema reference for the given stream in the manifest. 197 This function modifies the manifest in place. 198 """ 199 stream_name = stream["name"] 200 if SCHEMAS_TAG in self._normalized_manifest.keys(): 201 if stream_name in self._normalized_manifest[SCHEMAS_TAG]: 202 stream[SCHEMA_LOADER_TAG][SCHEMA_TAG] = self._create_schema_ref(stream_name) 203 204 def _replace_duplicates_with_refs(self, duplicates: DuplicatesType) -> None: 205 """ 206 Process duplicate objects and replace them with references. 207 208 Args: 209 duplicates: The duplicates dictionary collected from the given manifest. 210 """ 211 212 for _, occurrences in duplicates.items(): 213 type_key, key, value = self._get_occurance_samples(occurrences) 214 is_linked_def = self._is_linked_definition(type_key, key) 215 216 # Add to definitions if not there already 217 if not is_linked_def: 218 self._add_to_linked_definitions(type_key, key, value) 219 220 # Replace occurrences with references 221 for _, parent_obj, value in occurrences: 222 if is_linked_def: 223 if value == self._get_linked_definition_value(type_key, key): 224 parent_obj[key] = self._create_linked_definition_ref(type_key, key) 225 else: 226 parent_obj[key] = self._create_linked_definition_ref(type_key, key) 227 228 def _handle_duplicates(self, duplicates: DuplicatesType) -> None: 229 """ 230 Process the duplicates and replace them with references. 231 232 Args: 233 duplicates: The duplicates dictionary collected from the given manifest. 234 """ 235 236 if len(duplicates) > 0: 237 self._replace_duplicates_with_refs(duplicates) 238 239 def _add_duplicate( 240 self, 241 duplicates: DuplicatesType, 242 current_path: List[str], 243 obj: Dict[str, Any], 244 value: Any, 245 key: Optional[str] = None, 246 ) -> None: 247 """ 248 Adds a duplicate record of an observed object by computing a unique hash for the provided value. 249 250 This function computes a hash for the given value (or a dictionary composed of the key and value if a key is provided) 251 and appends a tuple containing the current path, the original object, and the value to the duplicates 252 dictionary under the corresponding hash. 253 254 Parameters: 255 duplicates (DuplicatesType): The dictionary to store duplicate records. 256 current_path (List[str]): The list of keys or indices representing the current location in the object hierarchy. 257 obj (Dict): The original dictionary object where the duplicate is observed. 258 value (Any): The value to be hashed and used for identifying duplicates. 259 key (Optional[str]): An optional key that, if provided, wraps the value in a dictionary before hashing. 260 """ 261 262 # create hash for each duplicate observed 263 value_to_hash = {key: value} if key is not None else value 264 duplicates[self._hash_object(value_to_hash)].append((current_path, obj, value)) 265 266 def _add_to_linked_definitions( 267 self, 268 type_key: str, 269 key: str, 270 value: Any, 271 ) -> None: 272 """ 273 Add a value to the linked definitions under the specified key. 274 275 Args: 276 definitions: The definitions dictionary to modify 277 key: The key to use 278 value: The value to add 279 """ 280 if type_key not in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys(): 281 self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key] = {} 282 283 if key not in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys(): 284 self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key][key] = value 285 286 def _collect_duplicates(self) -> DuplicatesType: 287 """ 288 Traverse the JSON object and collect all potential duplicate values and objects. 289 290 Returns: 291 duplicates: A dictionary of duplicate objects. 292 """ 293 294 def _collect(obj: Dict[str, Any], path: Optional[List[str]] = None) -> None: 295 """ 296 The closure to recursively collect duplicates in the JSON object. 297 298 Args: 299 obj: The current object being analyzed. 300 path: The current path in the object hierarchy. 301 """ 302 303 if not isinstance(obj, dict): 304 return 305 306 path = [] if path is None else path 307 # Check if the object is empty 308 for key, value in obj.items(): 309 # do not collect duplicates from `definitions` tag 310 if key == DEF_TAG: 311 continue 312 313 current_path = path + [key] 314 315 if isinstance(value, dict): 316 # First process nested dictionaries 317 _collect(value, current_path) 318 # Process allowed-only component tags 319 if key in self._linkable_tags: 320 self._add_duplicate(duplicates, current_path, obj, value) 321 322 # handle primitive types 323 elif isinstance(value, (str, int, float, bool)): 324 # Process allowed-only field tags 325 if key in self._linkable_tags: 326 self._add_duplicate(duplicates, current_path, obj, value, key) 327 328 # handle list cases 329 elif isinstance(value, list): 330 for i, item in enumerate(value): 331 _collect(item, current_path + [str(i)]) 332 333 duplicates: DuplicatesType = defaultdict(list, {}) 334 try: 335 if self._linkable_tags: 336 _collect(self._normalized_manifest) 337 # clean non-duplicates and sort based on the count of occurrences 338 return self._clean_and_sort_duplicates(duplicates) 339 return duplicates 340 except Exception as e: 341 raise ManifestNormalizationException(str(e)) 342 343 def _clean_and_sort_duplicates(self, duplicates: DuplicatesType) -> DuplicatesType: 344 """ 345 Clean non-duplicates and sort the duplicates by their occurrences. 346 347 Args: 348 duplicates: The duplicates dictionary to sort 349 350 Returns: 351 A sorted duplicates dictionary. 352 """ 353 354 # clean non-duplicates 355 duplicates = defaultdict( 356 list, 357 {k: v for k, v in duplicates.items() if len(v) >= N_OCCURANCES}, 358 ) 359 360 # sort the duplicates by their occurrences, more frequent ones go first 361 duplicates = defaultdict( 362 list, 363 {k: v for k, v in sorted(duplicates.items(), key=lambda x: len(x[1]), reverse=True)}, 364 ) 365 366 return duplicates 367 368 def _hash_object(self, obj: Dict[str, Any]) -> str: 369 """ 370 Create a unique hash for a dictionary object. 371 372 Args: 373 node: The dictionary to hash 374 375 Returns: 376 A hashed string 377 """ 378 379 # Sort keys to ensure consistent hash for same content 380 return hashlib.md5(json.dumps(obj, sort_keys=True).encode()).hexdigest() 381 382 def _is_linked_definition(self, type_key: str, key: str) -> bool: 383 """ 384 Check if the key already exists in the linked definitions. 385 386 Args: 387 key: The key to check 388 definitions: The definitions dictionary with definitions 389 390 Returns: 391 True if the key exists in the linked definitions, False otherwise 392 """ 393 394 if type_key in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys(): 395 # Check if the key exists in the linked definitions 396 if key in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys(): 397 return True 398 399 return False 400 401 def _get_linked_definition_value(self, type_key: str, key: str) -> Any: 402 """ 403 Get the value of a linked definition by its key. 404 405 Args: 406 key: The key to check 407 definitions: The definitions dictionary with definitions 408 409 Returns: 410 The value of the linked definition 411 """ 412 if type_key in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys(): 413 if key in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys(): 414 return self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key][key] 415 else: 416 raise ManifestNormalizationException( 417 f"Key {key} not found in linked definitions. Please check the manifest." 418 ) 419 420 def _get_occurance_samples(self, occurrences: DuplicateOccurancesType) -> Tuple[str, str, Any]: 421 """ 422 Get the key from the occurrences list. 423 424 Args: 425 occurrences: The occurrences list 426 427 Returns: 428 The key, type and value from the occurrences 429 """ 430 431 # Take the value from the first occurrence, as they are the same 432 path, obj, value = occurrences[0] 433 return ( 434 obj["type"], 435 path[-1], 436 value, 437 ) # Return the component's name as the last part of its path 438 439 def _create_linked_definition_ref(self, type_key: str, key: str) -> Dict[str, str]: 440 """ 441 Create a reference object for the linked definitions using the specified key. 442 443 Args: 444 ref_key: The reference key to use 445 446 Returns: 447 A reference object in the proper format 448 """ 449 450 return {"$ref": f"#/{DEF_TAG}/{LINKED_TAG}/{type_key}/{key}"} 451 452 def _create_schema_ref(self, key: str) -> Dict[str, str]: 453 """ 454 Create a reference object for stream schema using the specified key. 455 456 Args: 457 key: The reference key to use 458 459 Returns: 460 A reference object in the proper format 461 """ 462 463 return {"$ref": f"#/{SCHEMAS_TAG}/{key}"}
This class is responsible for normalizing the manifest by appliying processing such as:
- removing duplicated definitions
- replacing them with references.
To extend the functionality, use the normilize()
method to include any additional processing steps.
ManifestNormalizer( resolved_manifest: Dict[str, Any], declarative_schema: Dict[str, Any])
82 def __init__( 83 self, 84 resolved_manifest: ManifestType, 85 declarative_schema: DefinitionsType, 86 ) -> None: 87 self._resolved_manifest = resolved_manifest 88 self._declarative_schema = declarative_schema 89 self._normalized_manifest: ManifestType = copy.deepcopy(self._resolved_manifest) 90 # get the tags marked as `linkable` in the component schema 91 self._linkable_tags = _get_linkable_schema_tags(self._declarative_schema)
def
normalize(self) -> Dict[str, Any]:
96 def normalize(self) -> ManifestType: 97 """ 98 Normalizes the manifest by deduplicating and resolving schema references. 99 100 This method processes the manifest in two steps: 101 1. Deduplicates elements within the manifest 102 2. Resolves and references schemas 103 104 Returns: 105 ManifestType: The normalized manifest if processing succeeds, 106 or the original resolved manifest if normalization fails. 107 108 Raises: 109 ManifestNormalizationException: Caught internally and handled by returning the original manifest. 110 """ 111 try: 112 self._deduplicate_minifest() 113 self._reference_schemas() 114 115 return self._normalized_manifest 116 except ManifestNormalizationException: 117 # if any error occurs, we just return the original manifest. 118 # TODO: enable debug logging 119 return self._resolved_manifest
Normalizes the manifest by deduplicating and resolving schema references.
This method processes the manifest in two steps:
- Deduplicates elements within the manifest
- Resolves and references schemas
Returns:
ManifestType: The normalized manifest if processing succeeds, or the original resolved manifest if normalization fails.
Raises:
- ManifestNormalizationException: Caught internally and handled by returning the original manifest.