airbyte_cdk.sources.declarative.parsers.manifest_normalizer
1# 2# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 3# 4 5import copy 6import hashlib 7import json 8from collections import defaultdict 9from itertools import chain 10from typing import Any, Callable, DefaultDict, Dict, Iterable, List, Optional, Tuple 11 12from airbyte_cdk.sources.declarative.parsers.custom_exceptions import ManifestNormalizationException 13 14# Type definitions for better readability 15ManifestType = Dict[str, Any] 16DefinitionsType = Dict[str, Any] 17DuplicateOccurancesType = List[Tuple[List[str], Dict[str, Any], Dict[str, Any]]] 18DuplicatesType = DefaultDict[str, DuplicateOccurancesType] 19 20# Configuration constants 21N_OCCURANCES = 2 22 23DEF_TAG = "definitions" 24LINKABLE_TAG = "linkable" 25LINKED_TAG = "linked" 26PROPERTIES_TAG = "properties" 27SCHEMA_LOADER_TAG = "schema_loader" 28SCHEMA_TAG = "schema" 29SCHEMAS_TAG = "schemas" 30STREAMS_TAG = "streams" 31 32 33def _get_linkable_schema_tags(schema: DefinitionsType) -> List[str]: 34 """ 35 Extracts linkable tags from schema definitions. 36 This function identifies properties within a schema's definitions that are marked as linkable. 37 It traverses through each definition in the schema, examines its properties, and collects 38 the keys of properties that contain the LINKABLE_TAG. 39 40 Args: 41 schema (DefinitionsType): The schema definition dictionary to process 42 43 Returns: 44 List[str]: A deduplicated list of property keys that are marked as linkable 45 """ 46 47 # the linkable scope: ['definitions.*'] 48 schema_definitions = schema.get(DEF_TAG, {}) 49 50 linkable_tags: List[str] = [] 51 # Extract linkable keys from properties 52 53 extract_linkable_keys: Callable[[Dict[str, Dict[str, Any]]], List[str]] = lambda properties: [ 54 key for key, value in properties.items() if LINKABLE_TAG in value.keys() 55 ] 56 57 # Process each root value to get its linkable keys 58 process_root: Callable[[Dict[str, Any]], List[str]] = lambda root_value: extract_linkable_keys( 59 root_value.get(PROPERTIES_TAG, {}) 60 ) 61 62 # Map the process_root function over all schema values and flatten the results 63 all_linkable_tags = chain.from_iterable(map(process_root, schema_definitions.values())) 64 65 # Add all found linkable tags to the tags list 66 linkable_tags.extend(all_linkable_tags) 67 68 # return unique tags only 69 return list(set(linkable_tags)) 70 71 72class ManifestNormalizer: 73 """ 74 This class is responsible for normalizing the manifest by appliying processing such as: 75 - removing duplicated definitions 76 - replacing them with references. 77 78 To extend the functionality, use the `normilize()` method to include any additional processing steps. 79 """ 80 81 def __init__( 82 self, 83 resolved_manifest: ManifestType, 84 declarative_schema: DefinitionsType, 85 ) -> None: 86 self._resolved_manifest = resolved_manifest 87 self._declarative_schema = declarative_schema 88 self._normalized_manifest: ManifestType = copy.deepcopy(self._resolved_manifest) 89 # get the tags marked as `linkable` in the component schema 90 self._linkable_tags = _get_linkable_schema_tags(self._declarative_schema) 91 92 def to_json_str(self) -> str: 93 return json.dumps(self._normalized_manifest, indent=2) 94 95 def normalize(self) -> ManifestType: 96 """ 97 Normalizes the manifest by deduplicating and resolving schema references. 98 99 This method processes the manifest in two steps: 100 1. Deduplicates elements within the manifest 101 2. Resolves and references schemas 102 103 Returns: 104 ManifestType: The normalized manifest if processing succeeds, 105 or the original resolved manifest if normalization fails. 106 107 Raises: 108 ManifestNormalizationException: Caught internally and handled by returning the original manifest. 109 """ 110 try: 111 self._deduplicate_manifest() 112 113 return self._normalized_manifest 114 except ManifestNormalizationException: 115 # if any error occurs, we just return the original manifest. 116 # TODO: enable debug logging 117 return self._resolved_manifest 118 119 def _get_manifest_streams(self) -> Iterable[Dict[str, Any]]: 120 """ 121 Get the streams from the manifest. 122 123 Returns: 124 An Iterable of streams. 125 """ 126 127 if STREAMS_TAG in self._normalized_manifest.keys(): 128 for stream in self._normalized_manifest[STREAMS_TAG]: 129 yield stream 130 131 yield from [] 132 133 def _deduplicate_manifest(self) -> None: 134 """ 135 Find commonalities in the input JSON structure and refactor it to avoid redundancy. 136 """ 137 138 try: 139 # prepare the `definitions` tag 140 self._prepare_definitions() 141 # replace duplicates with references, if any 142 self._handle_duplicates(self._collect_duplicates()) 143 # replace parent streams with $refs 144 self._replace_parent_streams_with_refs() 145 # clean dangling fields after resolving $refs 146 self._clean_dangling_fields() 147 except Exception as e: 148 raise ManifestNormalizationException(str(e)) 149 150 def _replace_parent_streams_with_refs(self) -> None: 151 """ 152 For each stream in the manifest, if it has a retriever.partition_router with parent_stream_configs, 153 replace any 'stream' fields in those configs that are dicts and deeply equal to another stream object 154 with a $ref to the correct stream index. 155 """ 156 streams = self._normalized_manifest.get(STREAMS_TAG, []) 157 158 # Build a hash-to-index mapping for O(1) lookups 159 stream_hash_to_index = {} 160 for idx, stream in enumerate(streams): 161 stream_hash = self._hash_object(stream) 162 stream_hash_to_index[stream_hash] = idx 163 164 for idx, stream in enumerate(streams): 165 retriever = stream.get("retriever") 166 if not retriever: 167 continue 168 partition_router = retriever.get("partition_router") 169 routers = ( 170 partition_router 171 if isinstance(partition_router, list) 172 else [partition_router] 173 if partition_router 174 else [] 175 ) 176 for router in routers: 177 if not isinstance(router, dict): 178 continue 179 if router.get("type") != "SubstreamPartitionRouter": 180 continue 181 parent_stream_configs = router.get("parent_stream_configs", []) 182 for parent_config in parent_stream_configs: 183 if not isinstance(parent_config, dict): 184 continue 185 stream_ref = parent_config.get("stream") 186 # Only replace if it's a dict and matches any stream in the manifest 187 if stream_ref is not None and isinstance(stream_ref, dict): 188 stream_ref_hash = self._hash_object(stream_ref) 189 if stream_ref_hash in stream_hash_to_index: 190 parent_config["stream"] = { 191 "$ref": f"#/streams/{stream_hash_to_index[stream_ref_hash]}" 192 } 193 194 def _clean_dangling_fields(self) -> None: 195 """ 196 Clean the manifest by removing unused definitions and schemas. 197 This method removes any definitions or schemas that are not referenced by any $ref in the manifest. 198 """ 199 200 def find_all_refs(obj: Dict[str, Any], refs: List[str]) -> None: 201 """ 202 Recursively find all $ref paths in the object. 203 204 Args: 205 obj: The object to search through 206 refs: List to store found reference paths 207 """ 208 if not isinstance(obj, dict): 209 return 210 211 for key, value in obj.items(): 212 if key == "$ref" and isinstance(value, str): 213 # Remove the leading #/ from the ref path 214 refs.append(value[2:]) 215 elif isinstance(value, dict): 216 find_all_refs(value, refs) 217 elif isinstance(value, list): 218 for item in value: 219 if isinstance(item, dict): 220 find_all_refs(item, refs) 221 222 def clean_section(section: Dict[str, Any], section_path: str) -> None: 223 """ 224 Clean a section by removing unreferenced fields. 225 226 Args: 227 section: The section to clean 228 section_path: The path to this section in the manifest 229 """ 230 for key in list(section.keys()): 231 current_path = f"{section_path}/{key}" 232 # Check if this path is referenced or is a parent of a referenced path 233 if not any(ref.startswith(current_path) for ref in all_refs): 234 del section[key] 235 236 # Find all references in the manifest 237 all_refs: List[str] = [] 238 find_all_refs(self._normalized_manifest, all_refs) 239 240 # Clean definitions 241 if DEF_TAG in self._normalized_manifest: 242 clean_section(self._normalized_manifest[DEF_TAG], DEF_TAG) 243 # Remove empty definitions section 244 if not self._normalized_manifest[DEF_TAG]: 245 del self._normalized_manifest[DEF_TAG] 246 247 # Clean schemas 248 if SCHEMAS_TAG in self._normalized_manifest: 249 clean_section(self._normalized_manifest[SCHEMAS_TAG], SCHEMAS_TAG) 250 # Remove empty schemas section 251 if not self._normalized_manifest[SCHEMAS_TAG]: 252 del self._normalized_manifest[SCHEMAS_TAG] 253 254 def _prepare_definitions(self) -> None: 255 """ 256 Clean the definitions in the manifest by removing unnecessary properties. 257 This function modifies the manifest in place. 258 """ 259 260 # Check if the definitions tag exists 261 if not DEF_TAG in self._normalized_manifest: 262 self._normalized_manifest[DEF_TAG] = {} 263 264 # Check if the linked tag exists 265 if not LINKED_TAG in self._normalized_manifest[DEF_TAG]: 266 self._normalized_manifest[DEF_TAG][LINKED_TAG] = {} 267 268 # remove everything from definitions tag except of `linked`, after processing 269 for key in list(self._normalized_manifest[DEF_TAG].keys()): 270 if key != LINKED_TAG: 271 self._normalized_manifest[DEF_TAG].pop(key, None) 272 273 def _replace_duplicates_with_refs(self, duplicates: DuplicatesType) -> None: 274 """ 275 Process duplicate objects and replace them with references. 276 277 Args: 278 duplicates: The duplicates dictionary collected from the given manifest. 279 """ 280 281 for _, occurrences in duplicates.items(): 282 type_key, key, value = self._get_occurance_samples(occurrences) 283 is_linked_def = self._is_linked_definition(type_key, key) 284 285 # Add to definitions if not there already 286 if not is_linked_def: 287 self._add_to_linked_definitions(type_key, key, value) 288 289 # Replace occurrences with references 290 for _, parent_obj, value in occurrences: 291 if is_linked_def: 292 if value == self._get_linked_definition_value(type_key, key): 293 parent_obj[key] = self._create_linked_definition_ref(type_key, key) 294 else: 295 parent_obj[key] = self._create_linked_definition_ref(type_key, key) 296 297 def _handle_duplicates(self, duplicates: DuplicatesType) -> None: 298 """ 299 Process the duplicates and replace them with references. 300 301 Args: 302 duplicates: The duplicates dictionary collected from the given manifest. 303 """ 304 305 if len(duplicates) > 0: 306 self._replace_duplicates_with_refs(duplicates) 307 308 def _add_duplicate( 309 self, 310 duplicates: DuplicatesType, 311 current_path: List[str], 312 obj: Dict[str, Any], 313 value: Any, 314 key: Optional[str] = None, 315 ) -> None: 316 """ 317 Adds a duplicate record of an observed object by computing a unique hash for the provided value. 318 319 This function computes a hash for the given value (or a dictionary composed of the key and value if a key is provided) 320 and appends a tuple containing the current path, the original object, and the value to the duplicates 321 dictionary under the corresponding hash. 322 323 Parameters: 324 duplicates (DuplicatesType): The dictionary to store duplicate records. 325 current_path (List[str]): The list of keys or indices representing the current location in the object hierarchy. 326 obj (Dict): The original dictionary object where the duplicate is observed. 327 value (Any): The value to be hashed and used for identifying duplicates. 328 key (Optional[str]): An optional key that, if provided, wraps the value in a dictionary before hashing. 329 """ 330 331 # create hash for each duplicate observed 332 value_to_hash = {key: value} if key is not None else value 333 duplicates[self._hash_object(value_to_hash)].append((current_path, obj, value)) 334 335 def _add_to_linked_definitions( 336 self, 337 type_key: str, 338 key: str, 339 value: Any, 340 ) -> None: 341 """ 342 Add a value to the linked definitions under the specified key. 343 344 Args: 345 definitions: The definitions dictionary to modify 346 key: The key to use 347 value: The value to add 348 """ 349 if type_key not in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys(): 350 self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key] = {} 351 352 if key not in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys(): 353 self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key][key] = value 354 355 def _collect_duplicates(self) -> DuplicatesType: 356 """ 357 Traverse the JSON object and collect all potential duplicate values and objects. 358 359 Returns: 360 duplicates: A dictionary of duplicate objects. 361 """ 362 363 def _collect(obj: Dict[str, Any], path: Optional[List[str]] = None) -> None: 364 """ 365 The closure to recursively collect duplicates in the JSON object. 366 367 Args: 368 obj: The current object being analyzed. 369 path: The current path in the object hierarchy. 370 """ 371 372 if not isinstance(obj, dict): 373 return 374 375 path = [] if path is None else path 376 # Check if the object is empty 377 for key, value in obj.items(): 378 # do not collect duplicates from `definitions` tag 379 if key == DEF_TAG: 380 continue 381 382 current_path = path + [key] 383 384 if isinstance(value, dict): 385 # First process nested dictionaries 386 _collect(value, current_path) 387 # Process allowed-only component tags 388 if key in self._linkable_tags: 389 self._add_duplicate(duplicates, current_path, obj, value) 390 391 # handle primitive types 392 elif isinstance(value, (str, int, float, bool)): 393 # Process allowed-only field tags 394 if key in self._linkable_tags: 395 self._add_duplicate(duplicates, current_path, obj, value, key) 396 397 # handle list cases 398 elif isinstance(value, list): 399 for i, item in enumerate(value): 400 _collect(item, current_path + [str(i)]) 401 402 duplicates: DuplicatesType = defaultdict(list, {}) 403 try: 404 if self._linkable_tags: 405 _collect(self._normalized_manifest) 406 # clean non-duplicates and sort based on the count of occurrences 407 return self._clean_and_sort_duplicates(duplicates) 408 return duplicates 409 except Exception as e: 410 raise ManifestNormalizationException(str(e)) 411 412 def _clean_and_sort_duplicates(self, duplicates: DuplicatesType) -> DuplicatesType: 413 """ 414 Clean non-duplicates and sort the duplicates by their occurrences. 415 416 Args: 417 duplicates: The duplicates dictionary to sort 418 419 Returns: 420 A sorted duplicates dictionary. 421 """ 422 423 # clean non-duplicates 424 duplicates = defaultdict( 425 list, 426 {k: v for k, v in duplicates.items() if len(v) >= N_OCCURANCES}, 427 ) 428 429 # sort the duplicates by their occurrences, more frequent ones go first 430 duplicates = defaultdict( 431 list, 432 {k: v for k, v in sorted(duplicates.items(), key=lambda x: len(x[1]), reverse=True)}, 433 ) 434 435 return duplicates 436 437 def _hash_object(self, obj: Dict[str, Any]) -> str: 438 """ 439 Create a unique hash for a dictionary object. 440 441 Args: 442 node: The dictionary to hash 443 444 Returns: 445 A hashed string 446 """ 447 448 # Sort keys to ensure consistent hash for same content 449 return hashlib.md5(json.dumps(obj, sort_keys=True).encode()).hexdigest() 450 451 def _is_linked_definition(self, type_key: str, key: str) -> bool: 452 """ 453 Check if the key already exists in the linked definitions. 454 455 Args: 456 key: The key to check 457 definitions: The definitions dictionary with definitions 458 459 Returns: 460 True if the key exists in the linked definitions, False otherwise 461 """ 462 463 if type_key in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys(): 464 # Check if the key exists in the linked definitions 465 if key in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys(): 466 return True 467 468 return False 469 470 def _get_linked_definition_value(self, type_key: str, key: str) -> Any: 471 """ 472 Get the value of a linked definition by its key. 473 474 Args: 475 key: The key to check 476 definitions: The definitions dictionary with definitions 477 478 Returns: 479 The value of the linked definition 480 """ 481 if type_key in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys(): 482 if key in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys(): 483 return self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key][key] 484 else: 485 raise ManifestNormalizationException( 486 f"Key {key} not found in linked definitions. Please check the manifest." 487 ) 488 489 def _get_occurance_samples(self, occurrences: DuplicateOccurancesType) -> Tuple[str, str, Any]: 490 """ 491 Get the key from the occurrences list. 492 493 Args: 494 occurrences: The occurrences list 495 496 Returns: 497 The key, type and value from the occurrences 498 """ 499 500 # Take the value from the first occurrence, as they are the same 501 path, obj, value = occurrences[0] 502 return ( 503 obj["type"], 504 path[-1], 505 value, 506 ) # Return the component's name as the last part of its path 507 508 def _create_linked_definition_ref(self, type_key: str, key: str) -> Dict[str, str]: 509 """ 510 Create a reference object for the linked definitions using the specified key. 511 512 Args: 513 ref_key: The reference key to use 514 515 Returns: 516 A reference object in the proper format 517 """ 518 519 return {"$ref": f"#/{DEF_TAG}/{LINKED_TAG}/{type_key}/{key}"}
ManifestType =
typing.Dict[str, typing.Any]
DefinitionsType =
typing.Dict[str, typing.Any]
DuplicateOccurancesType =
typing.List[typing.Tuple[typing.List[str], typing.Dict[str, typing.Any], typing.Dict[str, typing.Any]]]
DuplicatesType =
typing.DefaultDict[str, typing.List[typing.Tuple[typing.List[str], typing.Dict[str, typing.Any], typing.Dict[str, typing.Any]]]]
N_OCCURANCES =
2
DEF_TAG =
'definitions'
LINKABLE_TAG =
'linkable'
LINKED_TAG =
'linked'
PROPERTIES_TAG =
'properties'
SCHEMA_LOADER_TAG =
'schema_loader'
SCHEMA_TAG =
'schema'
SCHEMAS_TAG =
'schemas'
STREAMS_TAG =
'streams'
class
ManifestNormalizer:
73class ManifestNormalizer: 74 """ 75 This class is responsible for normalizing the manifest by appliying processing such as: 76 - removing duplicated definitions 77 - replacing them with references. 78 79 To extend the functionality, use the `normilize()` method to include any additional processing steps. 80 """ 81 82 def __init__( 83 self, 84 resolved_manifest: ManifestType, 85 declarative_schema: DefinitionsType, 86 ) -> None: 87 self._resolved_manifest = resolved_manifest 88 self._declarative_schema = declarative_schema 89 self._normalized_manifest: ManifestType = copy.deepcopy(self._resolved_manifest) 90 # get the tags marked as `linkable` in the component schema 91 self._linkable_tags = _get_linkable_schema_tags(self._declarative_schema) 92 93 def to_json_str(self) -> str: 94 return json.dumps(self._normalized_manifest, indent=2) 95 96 def normalize(self) -> ManifestType: 97 """ 98 Normalizes the manifest by deduplicating and resolving schema references. 99 100 This method processes the manifest in two steps: 101 1. Deduplicates elements within the manifest 102 2. Resolves and references schemas 103 104 Returns: 105 ManifestType: The normalized manifest if processing succeeds, 106 or the original resolved manifest if normalization fails. 107 108 Raises: 109 ManifestNormalizationException: Caught internally and handled by returning the original manifest. 110 """ 111 try: 112 self._deduplicate_manifest() 113 114 return self._normalized_manifest 115 except ManifestNormalizationException: 116 # if any error occurs, we just return the original manifest. 117 # TODO: enable debug logging 118 return self._resolved_manifest 119 120 def _get_manifest_streams(self) -> Iterable[Dict[str, Any]]: 121 """ 122 Get the streams from the manifest. 123 124 Returns: 125 An Iterable of streams. 126 """ 127 128 if STREAMS_TAG in self._normalized_manifest.keys(): 129 for stream in self._normalized_manifest[STREAMS_TAG]: 130 yield stream 131 132 yield from [] 133 134 def _deduplicate_manifest(self) -> None: 135 """ 136 Find commonalities in the input JSON structure and refactor it to avoid redundancy. 137 """ 138 139 try: 140 # prepare the `definitions` tag 141 self._prepare_definitions() 142 # replace duplicates with references, if any 143 self._handle_duplicates(self._collect_duplicates()) 144 # replace parent streams with $refs 145 self._replace_parent_streams_with_refs() 146 # clean dangling fields after resolving $refs 147 self._clean_dangling_fields() 148 except Exception as e: 149 raise ManifestNormalizationException(str(e)) 150 151 def _replace_parent_streams_with_refs(self) -> None: 152 """ 153 For each stream in the manifest, if it has a retriever.partition_router with parent_stream_configs, 154 replace any 'stream' fields in those configs that are dicts and deeply equal to another stream object 155 with a $ref to the correct stream index. 156 """ 157 streams = self._normalized_manifest.get(STREAMS_TAG, []) 158 159 # Build a hash-to-index mapping for O(1) lookups 160 stream_hash_to_index = {} 161 for idx, stream in enumerate(streams): 162 stream_hash = self._hash_object(stream) 163 stream_hash_to_index[stream_hash] = idx 164 165 for idx, stream in enumerate(streams): 166 retriever = stream.get("retriever") 167 if not retriever: 168 continue 169 partition_router = retriever.get("partition_router") 170 routers = ( 171 partition_router 172 if isinstance(partition_router, list) 173 else [partition_router] 174 if partition_router 175 else [] 176 ) 177 for router in routers: 178 if not isinstance(router, dict): 179 continue 180 if router.get("type") != "SubstreamPartitionRouter": 181 continue 182 parent_stream_configs = router.get("parent_stream_configs", []) 183 for parent_config in parent_stream_configs: 184 if not isinstance(parent_config, dict): 185 continue 186 stream_ref = parent_config.get("stream") 187 # Only replace if it's a dict and matches any stream in the manifest 188 if stream_ref is not None and isinstance(stream_ref, dict): 189 stream_ref_hash = self._hash_object(stream_ref) 190 if stream_ref_hash in stream_hash_to_index: 191 parent_config["stream"] = { 192 "$ref": f"#/streams/{stream_hash_to_index[stream_ref_hash]}" 193 } 194 195 def _clean_dangling_fields(self) -> None: 196 """ 197 Clean the manifest by removing unused definitions and schemas. 198 This method removes any definitions or schemas that are not referenced by any $ref in the manifest. 199 """ 200 201 def find_all_refs(obj: Dict[str, Any], refs: List[str]) -> None: 202 """ 203 Recursively find all $ref paths in the object. 204 205 Args: 206 obj: The object to search through 207 refs: List to store found reference paths 208 """ 209 if not isinstance(obj, dict): 210 return 211 212 for key, value in obj.items(): 213 if key == "$ref" and isinstance(value, str): 214 # Remove the leading #/ from the ref path 215 refs.append(value[2:]) 216 elif isinstance(value, dict): 217 find_all_refs(value, refs) 218 elif isinstance(value, list): 219 for item in value: 220 if isinstance(item, dict): 221 find_all_refs(item, refs) 222 223 def clean_section(section: Dict[str, Any], section_path: str) -> None: 224 """ 225 Clean a section by removing unreferenced fields. 226 227 Args: 228 section: The section to clean 229 section_path: The path to this section in the manifest 230 """ 231 for key in list(section.keys()): 232 current_path = f"{section_path}/{key}" 233 # Check if this path is referenced or is a parent of a referenced path 234 if not any(ref.startswith(current_path) for ref in all_refs): 235 del section[key] 236 237 # Find all references in the manifest 238 all_refs: List[str] = [] 239 find_all_refs(self._normalized_manifest, all_refs) 240 241 # Clean definitions 242 if DEF_TAG in self._normalized_manifest: 243 clean_section(self._normalized_manifest[DEF_TAG], DEF_TAG) 244 # Remove empty definitions section 245 if not self._normalized_manifest[DEF_TAG]: 246 del self._normalized_manifest[DEF_TAG] 247 248 # Clean schemas 249 if SCHEMAS_TAG in self._normalized_manifest: 250 clean_section(self._normalized_manifest[SCHEMAS_TAG], SCHEMAS_TAG) 251 # Remove empty schemas section 252 if not self._normalized_manifest[SCHEMAS_TAG]: 253 del self._normalized_manifest[SCHEMAS_TAG] 254 255 def _prepare_definitions(self) -> None: 256 """ 257 Clean the definitions in the manifest by removing unnecessary properties. 258 This function modifies the manifest in place. 259 """ 260 261 # Check if the definitions tag exists 262 if not DEF_TAG in self._normalized_manifest: 263 self._normalized_manifest[DEF_TAG] = {} 264 265 # Check if the linked tag exists 266 if not LINKED_TAG in self._normalized_manifest[DEF_TAG]: 267 self._normalized_manifest[DEF_TAG][LINKED_TAG] = {} 268 269 # remove everything from definitions tag except of `linked`, after processing 270 for key in list(self._normalized_manifest[DEF_TAG].keys()): 271 if key != LINKED_TAG: 272 self._normalized_manifest[DEF_TAG].pop(key, None) 273 274 def _replace_duplicates_with_refs(self, duplicates: DuplicatesType) -> None: 275 """ 276 Process duplicate objects and replace them with references. 277 278 Args: 279 duplicates: The duplicates dictionary collected from the given manifest. 280 """ 281 282 for _, occurrences in duplicates.items(): 283 type_key, key, value = self._get_occurance_samples(occurrences) 284 is_linked_def = self._is_linked_definition(type_key, key) 285 286 # Add to definitions if not there already 287 if not is_linked_def: 288 self._add_to_linked_definitions(type_key, key, value) 289 290 # Replace occurrences with references 291 for _, parent_obj, value in occurrences: 292 if is_linked_def: 293 if value == self._get_linked_definition_value(type_key, key): 294 parent_obj[key] = self._create_linked_definition_ref(type_key, key) 295 else: 296 parent_obj[key] = self._create_linked_definition_ref(type_key, key) 297 298 def _handle_duplicates(self, duplicates: DuplicatesType) -> None: 299 """ 300 Process the duplicates and replace them with references. 301 302 Args: 303 duplicates: The duplicates dictionary collected from the given manifest. 304 """ 305 306 if len(duplicates) > 0: 307 self._replace_duplicates_with_refs(duplicates) 308 309 def _add_duplicate( 310 self, 311 duplicates: DuplicatesType, 312 current_path: List[str], 313 obj: Dict[str, Any], 314 value: Any, 315 key: Optional[str] = None, 316 ) -> None: 317 """ 318 Adds a duplicate record of an observed object by computing a unique hash for the provided value. 319 320 This function computes a hash for the given value (or a dictionary composed of the key and value if a key is provided) 321 and appends a tuple containing the current path, the original object, and the value to the duplicates 322 dictionary under the corresponding hash. 323 324 Parameters: 325 duplicates (DuplicatesType): The dictionary to store duplicate records. 326 current_path (List[str]): The list of keys or indices representing the current location in the object hierarchy. 327 obj (Dict): The original dictionary object where the duplicate is observed. 328 value (Any): The value to be hashed and used for identifying duplicates. 329 key (Optional[str]): An optional key that, if provided, wraps the value in a dictionary before hashing. 330 """ 331 332 # create hash for each duplicate observed 333 value_to_hash = {key: value} if key is not None else value 334 duplicates[self._hash_object(value_to_hash)].append((current_path, obj, value)) 335 336 def _add_to_linked_definitions( 337 self, 338 type_key: str, 339 key: str, 340 value: Any, 341 ) -> None: 342 """ 343 Add a value to the linked definitions under the specified key. 344 345 Args: 346 definitions: The definitions dictionary to modify 347 key: The key to use 348 value: The value to add 349 """ 350 if type_key not in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys(): 351 self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key] = {} 352 353 if key not in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys(): 354 self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key][key] = value 355 356 def _collect_duplicates(self) -> DuplicatesType: 357 """ 358 Traverse the JSON object and collect all potential duplicate values and objects. 359 360 Returns: 361 duplicates: A dictionary of duplicate objects. 362 """ 363 364 def _collect(obj: Dict[str, Any], path: Optional[List[str]] = None) -> None: 365 """ 366 The closure to recursively collect duplicates in the JSON object. 367 368 Args: 369 obj: The current object being analyzed. 370 path: The current path in the object hierarchy. 371 """ 372 373 if not isinstance(obj, dict): 374 return 375 376 path = [] if path is None else path 377 # Check if the object is empty 378 for key, value in obj.items(): 379 # do not collect duplicates from `definitions` tag 380 if key == DEF_TAG: 381 continue 382 383 current_path = path + [key] 384 385 if isinstance(value, dict): 386 # First process nested dictionaries 387 _collect(value, current_path) 388 # Process allowed-only component tags 389 if key in self._linkable_tags: 390 self._add_duplicate(duplicates, current_path, obj, value) 391 392 # handle primitive types 393 elif isinstance(value, (str, int, float, bool)): 394 # Process allowed-only field tags 395 if key in self._linkable_tags: 396 self._add_duplicate(duplicates, current_path, obj, value, key) 397 398 # handle list cases 399 elif isinstance(value, list): 400 for i, item in enumerate(value): 401 _collect(item, current_path + [str(i)]) 402 403 duplicates: DuplicatesType = defaultdict(list, {}) 404 try: 405 if self._linkable_tags: 406 _collect(self._normalized_manifest) 407 # clean non-duplicates and sort based on the count of occurrences 408 return self._clean_and_sort_duplicates(duplicates) 409 return duplicates 410 except Exception as e: 411 raise ManifestNormalizationException(str(e)) 412 413 def _clean_and_sort_duplicates(self, duplicates: DuplicatesType) -> DuplicatesType: 414 """ 415 Clean non-duplicates and sort the duplicates by their occurrences. 416 417 Args: 418 duplicates: The duplicates dictionary to sort 419 420 Returns: 421 A sorted duplicates dictionary. 422 """ 423 424 # clean non-duplicates 425 duplicates = defaultdict( 426 list, 427 {k: v for k, v in duplicates.items() if len(v) >= N_OCCURANCES}, 428 ) 429 430 # sort the duplicates by their occurrences, more frequent ones go first 431 duplicates = defaultdict( 432 list, 433 {k: v for k, v in sorted(duplicates.items(), key=lambda x: len(x[1]), reverse=True)}, 434 ) 435 436 return duplicates 437 438 def _hash_object(self, obj: Dict[str, Any]) -> str: 439 """ 440 Create a unique hash for a dictionary object. 441 442 Args: 443 node: The dictionary to hash 444 445 Returns: 446 A hashed string 447 """ 448 449 # Sort keys to ensure consistent hash for same content 450 return hashlib.md5(json.dumps(obj, sort_keys=True).encode()).hexdigest() 451 452 def _is_linked_definition(self, type_key: str, key: str) -> bool: 453 """ 454 Check if the key already exists in the linked definitions. 455 456 Args: 457 key: The key to check 458 definitions: The definitions dictionary with definitions 459 460 Returns: 461 True if the key exists in the linked definitions, False otherwise 462 """ 463 464 if type_key in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys(): 465 # Check if the key exists in the linked definitions 466 if key in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys(): 467 return True 468 469 return False 470 471 def _get_linked_definition_value(self, type_key: str, key: str) -> Any: 472 """ 473 Get the value of a linked definition by its key. 474 475 Args: 476 key: The key to check 477 definitions: The definitions dictionary with definitions 478 479 Returns: 480 The value of the linked definition 481 """ 482 if type_key in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys(): 483 if key in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys(): 484 return self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key][key] 485 else: 486 raise ManifestNormalizationException( 487 f"Key {key} not found in linked definitions. Please check the manifest." 488 ) 489 490 def _get_occurance_samples(self, occurrences: DuplicateOccurancesType) -> Tuple[str, str, Any]: 491 """ 492 Get the key from the occurrences list. 493 494 Args: 495 occurrences: The occurrences list 496 497 Returns: 498 The key, type and value from the occurrences 499 """ 500 501 # Take the value from the first occurrence, as they are the same 502 path, obj, value = occurrences[0] 503 return ( 504 obj["type"], 505 path[-1], 506 value, 507 ) # Return the component's name as the last part of its path 508 509 def _create_linked_definition_ref(self, type_key: str, key: str) -> Dict[str, str]: 510 """ 511 Create a reference object for the linked definitions using the specified key. 512 513 Args: 514 ref_key: The reference key to use 515 516 Returns: 517 A reference object in the proper format 518 """ 519 520 return {"$ref": f"#/{DEF_TAG}/{LINKED_TAG}/{type_key}/{key}"}
This class is responsible for normalizing the manifest by appliying processing such as:
- removing duplicated definitions
- replacing them with references.
To extend the functionality, use the normilize()
method to include any additional processing steps.
ManifestNormalizer( resolved_manifest: Dict[str, Any], declarative_schema: Dict[str, Any])
82 def __init__( 83 self, 84 resolved_manifest: ManifestType, 85 declarative_schema: DefinitionsType, 86 ) -> None: 87 self._resolved_manifest = resolved_manifest 88 self._declarative_schema = declarative_schema 89 self._normalized_manifest: ManifestType = copy.deepcopy(self._resolved_manifest) 90 # get the tags marked as `linkable` in the component schema 91 self._linkable_tags = _get_linkable_schema_tags(self._declarative_schema)
def
normalize(self) -> Dict[str, Any]:
96 def normalize(self) -> ManifestType: 97 """ 98 Normalizes the manifest by deduplicating and resolving schema references. 99 100 This method processes the manifest in two steps: 101 1. Deduplicates elements within the manifest 102 2. Resolves and references schemas 103 104 Returns: 105 ManifestType: The normalized manifest if processing succeeds, 106 or the original resolved manifest if normalization fails. 107 108 Raises: 109 ManifestNormalizationException: Caught internally and handled by returning the original manifest. 110 """ 111 try: 112 self._deduplicate_manifest() 113 114 return self._normalized_manifest 115 except ManifestNormalizationException: 116 # if any error occurs, we just return the original manifest. 117 # TODO: enable debug logging 118 return self._resolved_manifest
Normalizes the manifest by deduplicating and resolving schema references.
This method processes the manifest in two steps:
- Deduplicates elements within the manifest
- Resolves and references schemas
Returns:
ManifestType: The normalized manifest if processing succeeds, or the original resolved manifest if normalization fails.
Raises:
- ManifestNormalizationException: Caught internally and handled by returning the original manifest.