airbyte_cdk.sources.declarative.parsers.manifest_normalizer

  1#
  2# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  3#
  4
  5import copy
  6import hashlib
  7import json
  8from collections import defaultdict
  9from itertools import chain
 10from typing import Any, Callable, DefaultDict, Dict, Iterable, List, Optional, Tuple
 11
 12from airbyte_cdk.sources.declarative.parsers.custom_exceptions import ManifestNormalizationException
 13
 14# Type definitions for better readability
 15ManifestType = Dict[str, Any]
 16DefinitionsType = Dict[str, Any]
 17DuplicateOccurancesType = List[Tuple[List[str], Dict[str, Any], Dict[str, Any]]]
 18DuplicatesType = DefaultDict[str, DuplicateOccurancesType]
 19
 20# Configuration constants
 21N_OCCURANCES = 2
 22
 23DEF_TAG = "definitions"
 24LINKABLE_TAG = "linkable"
 25LINKED_TAG = "linked"
 26PROPERTIES_TAG = "properties"
 27SCHEMA_LOADER_TAG = "schema_loader"
 28SCHEMA_TAG = "schema"
 29SCHEMAS_TAG = "schemas"
 30STREAMS_TAG = "streams"
 31
 32
 33def _get_linkable_schema_tags(schema: DefinitionsType) -> List[str]:
 34    """
 35    Extracts linkable tags from schema definitions.
 36    This function identifies properties within a schema's definitions that are marked as linkable.
 37    It traverses through each definition in the schema, examines its properties, and collects
 38    the keys of properties that contain the LINKABLE_TAG.
 39
 40    Args:
 41        schema (DefinitionsType): The schema definition dictionary to process
 42
 43    Returns:
 44        List[str]: A deduplicated list of property keys that are marked as linkable
 45    """
 46
 47    # the linkable scope: ['definitions.*']
 48    schema_definitions = schema.get(DEF_TAG, {})
 49
 50    linkable_tags: List[str] = []
 51    # Extract linkable keys from properties
 52
 53    extract_linkable_keys: Callable[[Dict[str, Dict[str, Any]]], List[str]] = lambda properties: [
 54        key for key, value in properties.items() if LINKABLE_TAG in value.keys()
 55    ]
 56
 57    # Process each root value to get its linkable keys
 58    process_root: Callable[[Dict[str, Any]], List[str]] = lambda root_value: extract_linkable_keys(
 59        root_value.get(PROPERTIES_TAG, {})
 60    )
 61
 62    # Map the process_root function over all schema values and flatten the results
 63    all_linkable_tags = chain.from_iterable(map(process_root, schema_definitions.values()))
 64
 65    # Add all found linkable tags to the tags list
 66    linkable_tags.extend(all_linkable_tags)
 67
 68    # return unique tags only
 69    return list(set(linkable_tags))
 70
 71
 72class ManifestNormalizer:
 73    """
 74    This class is responsible for normalizing the manifest by appliying processing such as:
 75     - removing duplicated definitions
 76     - replacing them with references.
 77
 78    To extend the functionality, use the `normilize()` method to include any additional processing steps.
 79    """
 80
 81    def __init__(
 82        self,
 83        resolved_manifest: ManifestType,
 84        declarative_schema: DefinitionsType,
 85    ) -> None:
 86        self._resolved_manifest = resolved_manifest
 87        self._declarative_schema = declarative_schema
 88        self._normalized_manifest: ManifestType = copy.deepcopy(self._resolved_manifest)
 89        # get the tags marked as `linkable` in the component schema
 90        self._linkable_tags = _get_linkable_schema_tags(self._declarative_schema)
 91
 92    def to_json_str(self) -> str:
 93        return json.dumps(self._normalized_manifest, indent=2)
 94
 95    def normalize(self) -> ManifestType:
 96        """
 97        Normalizes the manifest by deduplicating and resolving schema references.
 98
 99        This method processes the manifest in two steps:
100        1. Deduplicates elements within the manifest
101        2. Resolves and references schemas
102
103        Returns:
104            ManifestType: The normalized manifest if processing succeeds,
105                          or the original resolved manifest if normalization fails.
106
107        Raises:
108            ManifestNormalizationException: Caught internally and handled by returning the original manifest.
109        """
110        try:
111            self._deduplicate_manifest()
112
113            return self._normalized_manifest
114        except ManifestNormalizationException:
115            # if any error occurs, we just return the original manifest.
116            # TODO: enable debug logging
117            return self._resolved_manifest
118
119    def _get_manifest_streams(self) -> Iterable[Dict[str, Any]]:
120        """
121        Get the streams from the manifest.
122
123        Returns:
124            An Iterable of streams.
125        """
126
127        if STREAMS_TAG in self._normalized_manifest.keys():
128            for stream in self._normalized_manifest[STREAMS_TAG]:
129                yield stream
130
131        yield from []
132
133    def _deduplicate_manifest(self) -> None:
134        """
135        Find commonalities in the input JSON structure and refactor it to avoid redundancy.
136        """
137
138        try:
139            # prepare the `definitions` tag
140            self._prepare_definitions()
141            # replace duplicates with references, if any
142            self._handle_duplicates(self._collect_duplicates())
143            # replace parent streams with $refs
144            self._replace_parent_streams_with_refs()
145            # clean dangling fields after resolving $refs
146            self._clean_dangling_fields()
147        except Exception as e:
148            raise ManifestNormalizationException(str(e))
149
150    def _replace_parent_streams_with_refs(self) -> None:
151        """
152        For each stream in the manifest, if it has a retriever.partition_router with parent_stream_configs,
153        replace any 'stream' fields in those configs that are dicts and deeply equal to another stream object
154        with a $ref to the correct stream index.
155        """
156        streams = self._normalized_manifest.get(STREAMS_TAG, [])
157
158        # Build a hash-to-index mapping for O(1) lookups
159        stream_hash_to_index = {}
160        for idx, stream in enumerate(streams):
161            stream_hash = self._hash_object(stream)
162            stream_hash_to_index[stream_hash] = idx
163
164        for idx, stream in enumerate(streams):
165            retriever = stream.get("retriever")
166            if not retriever:
167                continue
168            partition_router = retriever.get("partition_router")
169            routers = (
170                partition_router
171                if isinstance(partition_router, list)
172                else [partition_router]
173                if partition_router
174                else []
175            )
176            for router in routers:
177                if not isinstance(router, dict):
178                    continue
179                if router.get("type") != "SubstreamPartitionRouter":
180                    continue
181                parent_stream_configs = router.get("parent_stream_configs", [])
182                for parent_config in parent_stream_configs:
183                    if not isinstance(parent_config, dict):
184                        continue
185                    stream_ref = parent_config.get("stream")
186                    # Only replace if it's a dict and matches any stream in the manifest
187                    if stream_ref is not None and isinstance(stream_ref, dict):
188                        stream_ref_hash = self._hash_object(stream_ref)
189                        if stream_ref_hash in stream_hash_to_index:
190                            parent_config["stream"] = {
191                                "$ref": f"#/streams/{stream_hash_to_index[stream_ref_hash]}"
192                            }
193
194    def _clean_dangling_fields(self) -> None:
195        """
196        Clean the manifest by removing unused definitions and schemas.
197        This method removes any definitions or schemas that are not referenced by any $ref in the manifest.
198        """
199
200        def find_all_refs(obj: Dict[str, Any], refs: List[str]) -> None:
201            """
202            Recursively find all $ref paths in the object.
203
204            Args:
205                obj: The object to search through
206                refs: List to store found reference paths
207            """
208            if not isinstance(obj, dict):
209                return
210
211            for key, value in obj.items():
212                if key == "$ref" and isinstance(value, str):
213                    # Remove the leading #/ from the ref path
214                    refs.append(value[2:])
215                elif isinstance(value, dict):
216                    find_all_refs(value, refs)
217                elif isinstance(value, list):
218                    for item in value:
219                        if isinstance(item, dict):
220                            find_all_refs(item, refs)
221
222        def clean_section(section: Dict[str, Any], section_path: str) -> None:
223            """
224            Clean a section by removing unreferenced fields.
225
226            Args:
227                section: The section to clean
228                section_path: The path to this section in the manifest
229            """
230            for key in list(section.keys()):
231                current_path = f"{section_path}/{key}"
232                # Check if this path is referenced or is a parent of a referenced path
233                if not any(ref.startswith(current_path) for ref in all_refs):
234                    del section[key]
235
236        # Find all references in the manifest
237        all_refs: List[str] = []
238        find_all_refs(self._normalized_manifest, all_refs)
239
240        # Clean definitions
241        if DEF_TAG in self._normalized_manifest:
242            clean_section(self._normalized_manifest[DEF_TAG], DEF_TAG)
243            # Remove empty definitions section
244            if not self._normalized_manifest[DEF_TAG]:
245                del self._normalized_manifest[DEF_TAG]
246
247        # Clean schemas
248        if SCHEMAS_TAG in self._normalized_manifest:
249            clean_section(self._normalized_manifest[SCHEMAS_TAG], SCHEMAS_TAG)
250            # Remove empty schemas section
251            if not self._normalized_manifest[SCHEMAS_TAG]:
252                del self._normalized_manifest[SCHEMAS_TAG]
253
254    def _prepare_definitions(self) -> None:
255        """
256        Clean the definitions in the manifest by removing unnecessary properties.
257        This function modifies the manifest in place.
258        """
259
260        # Check if the definitions tag exists
261        if not DEF_TAG in self._normalized_manifest:
262            self._normalized_manifest[DEF_TAG] = {}
263
264        # Check if the linked tag exists
265        if not LINKED_TAG in self._normalized_manifest[DEF_TAG]:
266            self._normalized_manifest[DEF_TAG][LINKED_TAG] = {}
267
268        # remove everything from definitions tag except of `linked`, after processing
269        for key in list(self._normalized_manifest[DEF_TAG].keys()):
270            if key != LINKED_TAG:
271                self._normalized_manifest[DEF_TAG].pop(key, None)
272
273    def _replace_duplicates_with_refs(self, duplicates: DuplicatesType) -> None:
274        """
275        Process duplicate objects and replace them with references.
276
277        Args:
278            duplicates: The duplicates dictionary collected from the given manifest.
279        """
280
281        for _, occurrences in duplicates.items():
282            type_key, key, value = self._get_occurance_samples(occurrences)
283            is_linked_def = self._is_linked_definition(type_key, key)
284
285            # Add to definitions if not there already
286            if not is_linked_def:
287                self._add_to_linked_definitions(type_key, key, value)
288
289            # Replace occurrences with references
290            for _, parent_obj, value in occurrences:
291                if is_linked_def:
292                    if value == self._get_linked_definition_value(type_key, key):
293                        parent_obj[key] = self._create_linked_definition_ref(type_key, key)
294                else:
295                    parent_obj[key] = self._create_linked_definition_ref(type_key, key)
296
297    def _handle_duplicates(self, duplicates: DuplicatesType) -> None:
298        """
299        Process the duplicates and replace them with references.
300
301        Args:
302            duplicates: The duplicates dictionary collected from the given manifest.
303        """
304
305        if len(duplicates) > 0:
306            self._replace_duplicates_with_refs(duplicates)
307
308    def _add_duplicate(
309        self,
310        duplicates: DuplicatesType,
311        current_path: List[str],
312        obj: Dict[str, Any],
313        value: Any,
314        key: Optional[str] = None,
315    ) -> None:
316        """
317        Adds a duplicate record of an observed object by computing a unique hash for the provided value.
318
319        This function computes a hash for the given value (or a dictionary composed of the key and value if a key is provided)
320        and appends a tuple containing the current path, the original object, and the value to the duplicates
321        dictionary under the corresponding hash.
322
323        Parameters:
324            duplicates (DuplicatesType): The dictionary to store duplicate records.
325            current_path (List[str]): The list of keys or indices representing the current location in the object hierarchy.
326            obj (Dict): The original dictionary object where the duplicate is observed.
327            value (Any): The value to be hashed and used for identifying duplicates.
328            key (Optional[str]): An optional key that, if provided, wraps the value in a dictionary before hashing.
329        """
330
331        # create hash for each duplicate observed
332        value_to_hash = {key: value} if key is not None else value
333        duplicates[self._hash_object(value_to_hash)].append((current_path, obj, value))
334
335    def _add_to_linked_definitions(
336        self,
337        type_key: str,
338        key: str,
339        value: Any,
340    ) -> None:
341        """
342        Add a value to the linked definitions under the specified key.
343
344        Args:
345            definitions: The definitions dictionary to modify
346            key: The key to use
347            value: The value to add
348        """
349        if type_key not in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys():
350            self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key] = {}
351
352        if key not in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys():
353            self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key][key] = value
354
355    def _collect_duplicates(self) -> DuplicatesType:
356        """
357        Traverse the JSON object and collect all potential duplicate values and objects.
358
359        Returns:
360            duplicates: A dictionary of duplicate objects.
361        """
362
363        def _collect(obj: Dict[str, Any], path: Optional[List[str]] = None) -> None:
364            """
365            The closure to recursively collect duplicates in the JSON object.
366
367            Args:
368                obj: The current object being analyzed.
369                path: The current path in the object hierarchy.
370            """
371
372            if not isinstance(obj, dict):
373                return
374
375            path = [] if path is None else path
376            # Check if the object is empty
377            for key, value in obj.items():
378                # do not collect duplicates from `definitions` tag
379                if key == DEF_TAG:
380                    continue
381
382                current_path = path + [key]
383
384                if isinstance(value, dict):
385                    # First process nested dictionaries
386                    _collect(value, current_path)
387                    # Process allowed-only component tags
388                    if key in self._linkable_tags:
389                        self._add_duplicate(duplicates, current_path, obj, value)
390
391                # handle primitive types
392                elif isinstance(value, (str, int, float, bool)):
393                    # Process allowed-only field tags
394                    if key in self._linkable_tags:
395                        self._add_duplicate(duplicates, current_path, obj, value, key)
396
397                # handle list cases
398                elif isinstance(value, list):
399                    for i, item in enumerate(value):
400                        _collect(item, current_path + [str(i)])
401
402        duplicates: DuplicatesType = defaultdict(list, {})
403        try:
404            if self._linkable_tags:
405                _collect(self._normalized_manifest)
406                # clean non-duplicates and sort based on the count of occurrences
407                return self._clean_and_sort_duplicates(duplicates)
408            return duplicates
409        except Exception as e:
410            raise ManifestNormalizationException(str(e))
411
412    def _clean_and_sort_duplicates(self, duplicates: DuplicatesType) -> DuplicatesType:
413        """
414        Clean non-duplicates and sort the duplicates by their occurrences.
415
416        Args:
417            duplicates: The duplicates dictionary to sort
418
419        Returns:
420            A sorted duplicates dictionary.
421        """
422
423        # clean non-duplicates
424        duplicates = defaultdict(
425            list,
426            {k: v for k, v in duplicates.items() if len(v) >= N_OCCURANCES},
427        )
428
429        # sort the duplicates by their occurrences, more frequent ones go first
430        duplicates = defaultdict(
431            list,
432            {k: v for k, v in sorted(duplicates.items(), key=lambda x: len(x[1]), reverse=True)},
433        )
434
435        return duplicates
436
437    def _hash_object(self, obj: Dict[str, Any]) -> str:
438        """
439        Create a unique hash for a dictionary object.
440
441        Args:
442            node: The dictionary to hash
443
444        Returns:
445            A hashed string
446        """
447
448        # Sort keys to ensure consistent hash for same content
449        return hashlib.md5(json.dumps(obj, sort_keys=True).encode()).hexdigest()
450
451    def _is_linked_definition(self, type_key: str, key: str) -> bool:
452        """
453        Check if the key already exists in the linked definitions.
454
455        Args:
456            key: The key to check
457            definitions: The definitions dictionary with definitions
458
459        Returns:
460            True if the key exists in the linked definitions, False otherwise
461        """
462
463        if type_key in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys():
464            # Check if the key exists in the linked definitions
465            if key in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys():
466                return True
467
468        return False
469
470    def _get_linked_definition_value(self, type_key: str, key: str) -> Any:
471        """
472        Get the value of a linked definition by its key.
473
474        Args:
475            key: The key to check
476            definitions: The definitions dictionary with definitions
477
478        Returns:
479            The value of the linked definition
480        """
481        if type_key in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys():
482            if key in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys():
483                return self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key][key]
484        else:
485            raise ManifestNormalizationException(
486                f"Key {key} not found in linked definitions. Please check the manifest."
487            )
488
489    def _get_occurance_samples(self, occurrences: DuplicateOccurancesType) -> Tuple[str, str, Any]:
490        """
491        Get the key from the occurrences list.
492
493        Args:
494            occurrences: The occurrences list
495
496        Returns:
497            The key, type and value from the occurrences
498        """
499
500        # Take the value from the first occurrence, as they are the same
501        path, obj, value = occurrences[0]
502        return (
503            obj["type"],
504            path[-1],
505            value,
506        )  # Return the component's name as the last part of its path
507
508    def _create_linked_definition_ref(self, type_key: str, key: str) -> Dict[str, str]:
509        """
510        Create a reference object for the linked definitions using the specified key.
511
512        Args:
513            ref_key: The reference key to use
514
515        Returns:
516            A reference object in the proper format
517        """
518
519        return {"$ref": f"#/{DEF_TAG}/{LINKED_TAG}/{type_key}/{key}"}
ManifestType = typing.Dict[str, typing.Any]
DefinitionsType = typing.Dict[str, typing.Any]
DuplicateOccurancesType = typing.List[typing.Tuple[typing.List[str], typing.Dict[str, typing.Any], typing.Dict[str, typing.Any]]]
DuplicatesType = typing.DefaultDict[str, typing.List[typing.Tuple[typing.List[str], typing.Dict[str, typing.Any], typing.Dict[str, typing.Any]]]]
N_OCCURANCES = 2
DEF_TAG = 'definitions'
LINKABLE_TAG = 'linkable'
LINKED_TAG = 'linked'
PROPERTIES_TAG = 'properties'
SCHEMA_LOADER_TAG = 'schema_loader'
SCHEMA_TAG = 'schema'
SCHEMAS_TAG = 'schemas'
STREAMS_TAG = 'streams'
class ManifestNormalizer:
 73class ManifestNormalizer:
 74    """
 75    This class is responsible for normalizing the manifest by appliying processing such as:
 76     - removing duplicated definitions
 77     - replacing them with references.
 78
 79    To extend the functionality, use the `normilize()` method to include any additional processing steps.
 80    """
 81
 82    def __init__(
 83        self,
 84        resolved_manifest: ManifestType,
 85        declarative_schema: DefinitionsType,
 86    ) -> None:
 87        self._resolved_manifest = resolved_manifest
 88        self._declarative_schema = declarative_schema
 89        self._normalized_manifest: ManifestType = copy.deepcopy(self._resolved_manifest)
 90        # get the tags marked as `linkable` in the component schema
 91        self._linkable_tags = _get_linkable_schema_tags(self._declarative_schema)
 92
 93    def to_json_str(self) -> str:
 94        return json.dumps(self._normalized_manifest, indent=2)
 95
 96    def normalize(self) -> ManifestType:
 97        """
 98        Normalizes the manifest by deduplicating and resolving schema references.
 99
100        This method processes the manifest in two steps:
101        1. Deduplicates elements within the manifest
102        2. Resolves and references schemas
103
104        Returns:
105            ManifestType: The normalized manifest if processing succeeds,
106                          or the original resolved manifest if normalization fails.
107
108        Raises:
109            ManifestNormalizationException: Caught internally and handled by returning the original manifest.
110        """
111        try:
112            self._deduplicate_manifest()
113
114            return self._normalized_manifest
115        except ManifestNormalizationException:
116            # if any error occurs, we just return the original manifest.
117            # TODO: enable debug logging
118            return self._resolved_manifest
119
120    def _get_manifest_streams(self) -> Iterable[Dict[str, Any]]:
121        """
122        Get the streams from the manifest.
123
124        Returns:
125            An Iterable of streams.
126        """
127
128        if STREAMS_TAG in self._normalized_manifest.keys():
129            for stream in self._normalized_manifest[STREAMS_TAG]:
130                yield stream
131
132        yield from []
133
134    def _deduplicate_manifest(self) -> None:
135        """
136        Find commonalities in the input JSON structure and refactor it to avoid redundancy.
137        """
138
139        try:
140            # prepare the `definitions` tag
141            self._prepare_definitions()
142            # replace duplicates with references, if any
143            self._handle_duplicates(self._collect_duplicates())
144            # replace parent streams with $refs
145            self._replace_parent_streams_with_refs()
146            # clean dangling fields after resolving $refs
147            self._clean_dangling_fields()
148        except Exception as e:
149            raise ManifestNormalizationException(str(e))
150
151    def _replace_parent_streams_with_refs(self) -> None:
152        """
153        For each stream in the manifest, if it has a retriever.partition_router with parent_stream_configs,
154        replace any 'stream' fields in those configs that are dicts and deeply equal to another stream object
155        with a $ref to the correct stream index.
156        """
157        streams = self._normalized_manifest.get(STREAMS_TAG, [])
158
159        # Build a hash-to-index mapping for O(1) lookups
160        stream_hash_to_index = {}
161        for idx, stream in enumerate(streams):
162            stream_hash = self._hash_object(stream)
163            stream_hash_to_index[stream_hash] = idx
164
165        for idx, stream in enumerate(streams):
166            retriever = stream.get("retriever")
167            if not retriever:
168                continue
169            partition_router = retriever.get("partition_router")
170            routers = (
171                partition_router
172                if isinstance(partition_router, list)
173                else [partition_router]
174                if partition_router
175                else []
176            )
177            for router in routers:
178                if not isinstance(router, dict):
179                    continue
180                if router.get("type") != "SubstreamPartitionRouter":
181                    continue
182                parent_stream_configs = router.get("parent_stream_configs", [])
183                for parent_config in parent_stream_configs:
184                    if not isinstance(parent_config, dict):
185                        continue
186                    stream_ref = parent_config.get("stream")
187                    # Only replace if it's a dict and matches any stream in the manifest
188                    if stream_ref is not None and isinstance(stream_ref, dict):
189                        stream_ref_hash = self._hash_object(stream_ref)
190                        if stream_ref_hash in stream_hash_to_index:
191                            parent_config["stream"] = {
192                                "$ref": f"#/streams/{stream_hash_to_index[stream_ref_hash]}"
193                            }
194
195    def _clean_dangling_fields(self) -> None:
196        """
197        Clean the manifest by removing unused definitions and schemas.
198        This method removes any definitions or schemas that are not referenced by any $ref in the manifest.
199        """
200
201        def find_all_refs(obj: Dict[str, Any], refs: List[str]) -> None:
202            """
203            Recursively find all $ref paths in the object.
204
205            Args:
206                obj: The object to search through
207                refs: List to store found reference paths
208            """
209            if not isinstance(obj, dict):
210                return
211
212            for key, value in obj.items():
213                if key == "$ref" and isinstance(value, str):
214                    # Remove the leading #/ from the ref path
215                    refs.append(value[2:])
216                elif isinstance(value, dict):
217                    find_all_refs(value, refs)
218                elif isinstance(value, list):
219                    for item in value:
220                        if isinstance(item, dict):
221                            find_all_refs(item, refs)
222
223        def clean_section(section: Dict[str, Any], section_path: str) -> None:
224            """
225            Clean a section by removing unreferenced fields.
226
227            Args:
228                section: The section to clean
229                section_path: The path to this section in the manifest
230            """
231            for key in list(section.keys()):
232                current_path = f"{section_path}/{key}"
233                # Check if this path is referenced or is a parent of a referenced path
234                if not any(ref.startswith(current_path) for ref in all_refs):
235                    del section[key]
236
237        # Find all references in the manifest
238        all_refs: List[str] = []
239        find_all_refs(self._normalized_manifest, all_refs)
240
241        # Clean definitions
242        if DEF_TAG in self._normalized_manifest:
243            clean_section(self._normalized_manifest[DEF_TAG], DEF_TAG)
244            # Remove empty definitions section
245            if not self._normalized_manifest[DEF_TAG]:
246                del self._normalized_manifest[DEF_TAG]
247
248        # Clean schemas
249        if SCHEMAS_TAG in self._normalized_manifest:
250            clean_section(self._normalized_manifest[SCHEMAS_TAG], SCHEMAS_TAG)
251            # Remove empty schemas section
252            if not self._normalized_manifest[SCHEMAS_TAG]:
253                del self._normalized_manifest[SCHEMAS_TAG]
254
255    def _prepare_definitions(self) -> None:
256        """
257        Clean the definitions in the manifest by removing unnecessary properties.
258        This function modifies the manifest in place.
259        """
260
261        # Check if the definitions tag exists
262        if not DEF_TAG in self._normalized_manifest:
263            self._normalized_manifest[DEF_TAG] = {}
264
265        # Check if the linked tag exists
266        if not LINKED_TAG in self._normalized_manifest[DEF_TAG]:
267            self._normalized_manifest[DEF_TAG][LINKED_TAG] = {}
268
269        # remove everything from definitions tag except of `linked`, after processing
270        for key in list(self._normalized_manifest[DEF_TAG].keys()):
271            if key != LINKED_TAG:
272                self._normalized_manifest[DEF_TAG].pop(key, None)
273
274    def _replace_duplicates_with_refs(self, duplicates: DuplicatesType) -> None:
275        """
276        Process duplicate objects and replace them with references.
277
278        Args:
279            duplicates: The duplicates dictionary collected from the given manifest.
280        """
281
282        for _, occurrences in duplicates.items():
283            type_key, key, value = self._get_occurance_samples(occurrences)
284            is_linked_def = self._is_linked_definition(type_key, key)
285
286            # Add to definitions if not there already
287            if not is_linked_def:
288                self._add_to_linked_definitions(type_key, key, value)
289
290            # Replace occurrences with references
291            for _, parent_obj, value in occurrences:
292                if is_linked_def:
293                    if value == self._get_linked_definition_value(type_key, key):
294                        parent_obj[key] = self._create_linked_definition_ref(type_key, key)
295                else:
296                    parent_obj[key] = self._create_linked_definition_ref(type_key, key)
297
298    def _handle_duplicates(self, duplicates: DuplicatesType) -> None:
299        """
300        Process the duplicates and replace them with references.
301
302        Args:
303            duplicates: The duplicates dictionary collected from the given manifest.
304        """
305
306        if len(duplicates) > 0:
307            self._replace_duplicates_with_refs(duplicates)
308
309    def _add_duplicate(
310        self,
311        duplicates: DuplicatesType,
312        current_path: List[str],
313        obj: Dict[str, Any],
314        value: Any,
315        key: Optional[str] = None,
316    ) -> None:
317        """
318        Adds a duplicate record of an observed object by computing a unique hash for the provided value.
319
320        This function computes a hash for the given value (or a dictionary composed of the key and value if a key is provided)
321        and appends a tuple containing the current path, the original object, and the value to the duplicates
322        dictionary under the corresponding hash.
323
324        Parameters:
325            duplicates (DuplicatesType): The dictionary to store duplicate records.
326            current_path (List[str]): The list of keys or indices representing the current location in the object hierarchy.
327            obj (Dict): The original dictionary object where the duplicate is observed.
328            value (Any): The value to be hashed and used for identifying duplicates.
329            key (Optional[str]): An optional key that, if provided, wraps the value in a dictionary before hashing.
330        """
331
332        # create hash for each duplicate observed
333        value_to_hash = {key: value} if key is not None else value
334        duplicates[self._hash_object(value_to_hash)].append((current_path, obj, value))
335
336    def _add_to_linked_definitions(
337        self,
338        type_key: str,
339        key: str,
340        value: Any,
341    ) -> None:
342        """
343        Add a value to the linked definitions under the specified key.
344
345        Args:
346            definitions: The definitions dictionary to modify
347            key: The key to use
348            value: The value to add
349        """
350        if type_key not in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys():
351            self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key] = {}
352
353        if key not in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys():
354            self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key][key] = value
355
356    def _collect_duplicates(self) -> DuplicatesType:
357        """
358        Traverse the JSON object and collect all potential duplicate values and objects.
359
360        Returns:
361            duplicates: A dictionary of duplicate objects.
362        """
363
364        def _collect(obj: Dict[str, Any], path: Optional[List[str]] = None) -> None:
365            """
366            The closure to recursively collect duplicates in the JSON object.
367
368            Args:
369                obj: The current object being analyzed.
370                path: The current path in the object hierarchy.
371            """
372
373            if not isinstance(obj, dict):
374                return
375
376            path = [] if path is None else path
377            # Check if the object is empty
378            for key, value in obj.items():
379                # do not collect duplicates from `definitions` tag
380                if key == DEF_TAG:
381                    continue
382
383                current_path = path + [key]
384
385                if isinstance(value, dict):
386                    # First process nested dictionaries
387                    _collect(value, current_path)
388                    # Process allowed-only component tags
389                    if key in self._linkable_tags:
390                        self._add_duplicate(duplicates, current_path, obj, value)
391
392                # handle primitive types
393                elif isinstance(value, (str, int, float, bool)):
394                    # Process allowed-only field tags
395                    if key in self._linkable_tags:
396                        self._add_duplicate(duplicates, current_path, obj, value, key)
397
398                # handle list cases
399                elif isinstance(value, list):
400                    for i, item in enumerate(value):
401                        _collect(item, current_path + [str(i)])
402
403        duplicates: DuplicatesType = defaultdict(list, {})
404        try:
405            if self._linkable_tags:
406                _collect(self._normalized_manifest)
407                # clean non-duplicates and sort based on the count of occurrences
408                return self._clean_and_sort_duplicates(duplicates)
409            return duplicates
410        except Exception as e:
411            raise ManifestNormalizationException(str(e))
412
413    def _clean_and_sort_duplicates(self, duplicates: DuplicatesType) -> DuplicatesType:
414        """
415        Clean non-duplicates and sort the duplicates by their occurrences.
416
417        Args:
418            duplicates: The duplicates dictionary to sort
419
420        Returns:
421            A sorted duplicates dictionary.
422        """
423
424        # clean non-duplicates
425        duplicates = defaultdict(
426            list,
427            {k: v for k, v in duplicates.items() if len(v) >= N_OCCURANCES},
428        )
429
430        # sort the duplicates by their occurrences, more frequent ones go first
431        duplicates = defaultdict(
432            list,
433            {k: v for k, v in sorted(duplicates.items(), key=lambda x: len(x[1]), reverse=True)},
434        )
435
436        return duplicates
437
438    def _hash_object(self, obj: Dict[str, Any]) -> str:
439        """
440        Create a unique hash for a dictionary object.
441
442        Args:
443            node: The dictionary to hash
444
445        Returns:
446            A hashed string
447        """
448
449        # Sort keys to ensure consistent hash for same content
450        return hashlib.md5(json.dumps(obj, sort_keys=True).encode()).hexdigest()
451
452    def _is_linked_definition(self, type_key: str, key: str) -> bool:
453        """
454        Check if the key already exists in the linked definitions.
455
456        Args:
457            key: The key to check
458            definitions: The definitions dictionary with definitions
459
460        Returns:
461            True if the key exists in the linked definitions, False otherwise
462        """
463
464        if type_key in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys():
465            # Check if the key exists in the linked definitions
466            if key in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys():
467                return True
468
469        return False
470
471    def _get_linked_definition_value(self, type_key: str, key: str) -> Any:
472        """
473        Get the value of a linked definition by its key.
474
475        Args:
476            key: The key to check
477            definitions: The definitions dictionary with definitions
478
479        Returns:
480            The value of the linked definition
481        """
482        if type_key in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys():
483            if key in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys():
484                return self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key][key]
485        else:
486            raise ManifestNormalizationException(
487                f"Key {key} not found in linked definitions. Please check the manifest."
488            )
489
490    def _get_occurance_samples(self, occurrences: DuplicateOccurancesType) -> Tuple[str, str, Any]:
491        """
492        Get the key from the occurrences list.
493
494        Args:
495            occurrences: The occurrences list
496
497        Returns:
498            The key, type and value from the occurrences
499        """
500
501        # Take the value from the first occurrence, as they are the same
502        path, obj, value = occurrences[0]
503        return (
504            obj["type"],
505            path[-1],
506            value,
507        )  # Return the component's name as the last part of its path
508
509    def _create_linked_definition_ref(self, type_key: str, key: str) -> Dict[str, str]:
510        """
511        Create a reference object for the linked definitions using the specified key.
512
513        Args:
514            ref_key: The reference key to use
515
516        Returns:
517            A reference object in the proper format
518        """
519
520        return {"$ref": f"#/{DEF_TAG}/{LINKED_TAG}/{type_key}/{key}"}
This class is responsible for normalizing the manifest by appliying processing such as:
  • removing duplicated definitions
  • replacing them with references.

To extend the functionality, use the normilize() method to include any additional processing steps.

ManifestNormalizer( resolved_manifest: Dict[str, Any], declarative_schema: Dict[str, Any])
82    def __init__(
83        self,
84        resolved_manifest: ManifestType,
85        declarative_schema: DefinitionsType,
86    ) -> None:
87        self._resolved_manifest = resolved_manifest
88        self._declarative_schema = declarative_schema
89        self._normalized_manifest: ManifestType = copy.deepcopy(self._resolved_manifest)
90        # get the tags marked as `linkable` in the component schema
91        self._linkable_tags = _get_linkable_schema_tags(self._declarative_schema)
def to_json_str(self) -> str:
93    def to_json_str(self) -> str:
94        return json.dumps(self._normalized_manifest, indent=2)
def normalize(self) -> Dict[str, Any]:
 96    def normalize(self) -> ManifestType:
 97        """
 98        Normalizes the manifest by deduplicating and resolving schema references.
 99
100        This method processes the manifest in two steps:
101        1. Deduplicates elements within the manifest
102        2. Resolves and references schemas
103
104        Returns:
105            ManifestType: The normalized manifest if processing succeeds,
106                          or the original resolved manifest if normalization fails.
107
108        Raises:
109            ManifestNormalizationException: Caught internally and handled by returning the original manifest.
110        """
111        try:
112            self._deduplicate_manifest()
113
114            return self._normalized_manifest
115        except ManifestNormalizationException:
116            # if any error occurs, we just return the original manifest.
117            # TODO: enable debug logging
118            return self._resolved_manifest

Normalizes the manifest by deduplicating and resolving schema references.

This method processes the manifest in two steps:

  1. Deduplicates elements within the manifest
  2. Resolves and references schemas
Returns:

ManifestType: The normalized manifest if processing succeeds, or the original resolved manifest if normalization fails.

Raises:
  • ManifestNormalizationException: Caught internally and handled by returning the original manifest.