airbyte_cdk.sources.declarative.parsers.manifest_normalizer

  1#
  2# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  3#
  4
  5import copy
  6import hashlib
  7import json
  8from collections import defaultdict
  9from itertools import chain
 10from typing import Any, Callable, DefaultDict, Dict, Iterable, List, Optional, Tuple
 11
 12from airbyte_cdk.sources.declarative.parsers.custom_exceptions import ManifestNormalizationException
 13
 14# Type definitions for better readability
 15ManifestType = Dict[str, Any]
 16DefinitionsType = Dict[str, Any]
 17DuplicateOccurancesType = List[Tuple[List[str], Dict[str, Any], Dict[str, Any]]]
 18DuplicatesType = DefaultDict[str, DuplicateOccurancesType]
 19
 20# Configuration constants
 21N_OCCURANCES = 2
 22
 23DEF_TAG = "definitions"
 24LINKABLE_TAG = "linkable"
 25LINKED_TAG = "linked"
 26PROPERTIES_TAG = "properties"
 27SCHEMA_LOADER_TAG = "schema_loader"
 28SCHEMA_TAG = "schema"
 29SCHEMAS_TAG = "schemas"
 30STREAMS_TAG = "streams"
 31
 32
 33def _get_linkable_schema_tags(schema: DefinitionsType) -> List[str]:
 34    """
 35    Extracts linkable tags from schema definitions.
 36    This function identifies properties within a schema's definitions that are marked as linkable.
 37    It traverses through each definition in the schema, examines its properties, and collects
 38    the keys of properties that contain the LINKABLE_TAG.
 39
 40    Args:
 41        schema (DefinitionsType): The schema definition dictionary to process
 42
 43    Returns:
 44        List[str]: A deduplicated list of property keys that are marked as linkable
 45    """
 46
 47    # the linkable scope: ['definitions.*']
 48    schema_definitions = schema.get(DEF_TAG, {})
 49
 50    linkable_tags: List[str] = []
 51    # Extract linkable keys from properties
 52
 53    extract_linkable_keys: Callable[[Dict[str, Dict[str, Any]]], List[str]] = lambda properties: [
 54        key for key, value in properties.items() if LINKABLE_TAG in value.keys()
 55    ]
 56
 57    # Process each root value to get its linkable keys
 58    process_root: Callable[[Dict[str, Any]], List[str]] = lambda root_value: extract_linkable_keys(
 59        root_value.get(PROPERTIES_TAG, {})
 60    )
 61
 62    # Map the process_root function over all schema values and flatten the results
 63    all_linkable_tags = chain.from_iterable(map(process_root, schema_definitions.values()))
 64
 65    # Add all found linkable tags to the tags list
 66    linkable_tags.extend(all_linkable_tags)
 67
 68    # return unique tags only
 69    return list(set(linkable_tags))
 70
 71
 72class ManifestNormalizer:
 73    """
 74    This class is responsible for normalizing the manifest by appliying processing such as:
 75     - removing duplicated definitions
 76     - replacing them with references.
 77
 78    To extend the functionality, use the `normilize()` method to include any additional processing steps.
 79    """
 80
 81    def __init__(
 82        self,
 83        resolved_manifest: ManifestType,
 84        declarative_schema: DefinitionsType,
 85    ) -> None:
 86        self._resolved_manifest = resolved_manifest
 87        self._declarative_schema = declarative_schema
 88        self._normalized_manifest: ManifestType = copy.deepcopy(self._resolved_manifest)
 89        # get the tags marked as `linkable` in the component schema
 90        self._linkable_tags = _get_linkable_schema_tags(self._declarative_schema)
 91
 92    def to_json_str(self) -> str:
 93        return json.dumps(self._normalized_manifest, indent=2)
 94
 95    def normalize(self) -> ManifestType:
 96        """
 97        Normalizes the manifest by deduplicating and resolving schema references.
 98
 99        This method processes the manifest in two steps:
100        1. Deduplicates elements within the manifest
101        2. Resolves and references schemas
102
103        Returns:
104            ManifestType: The normalized manifest if processing succeeds,
105                          or the original resolved manifest if normalization fails.
106
107        Raises:
108            ManifestNormalizationException: Caught internally and handled by returning the original manifest.
109        """
110        try:
111            self._deduplicate_minifest()
112            self._reference_schemas()
113
114            return self._normalized_manifest
115        except ManifestNormalizationException:
116            # if any error occurs, we just return the original manifest.
117            # TODO: enable debug logging
118            return self._resolved_manifest
119
120    def _get_manifest_streams(self) -> Iterable[Dict[str, Any]]:
121        """
122        Get the streams from the manifest.
123
124        Returns:
125            An Iterable of streams.
126        """
127
128        if STREAMS_TAG in self._normalized_manifest.keys():
129            for stream in self._normalized_manifest[STREAMS_TAG]:
130                yield stream
131
132        yield from []
133
134    def _deduplicate_minifest(self) -> None:
135        """
136        Find commonalities in the input JSON structure and refactor it to avoid redundancy.
137        """
138
139        try:
140            # prepare the `definitions` tag
141            self._prepare_definitions()
142            # replace duplicates with references, if any
143            self._handle_duplicates(self._collect_duplicates())
144        except Exception as e:
145            raise ManifestNormalizationException(str(e))
146
147    def _prepare_definitions(self) -> None:
148        """
149        Clean the definitions in the manifest by removing unnecessary properties.
150        This function modifies the manifest in place.
151        """
152
153        # Check if the definitions tag exists
154        if not DEF_TAG in self._normalized_manifest:
155            self._normalized_manifest[DEF_TAG] = {}
156
157        # Check if the linked tag exists
158        if not LINKED_TAG in self._normalized_manifest[DEF_TAG]:
159            self._normalized_manifest[DEF_TAG][LINKED_TAG] = {}
160
161        # remove everything from definitions tag except of `linked`, after processing
162        for key in list(self._normalized_manifest[DEF_TAG].keys()):
163            if key != LINKED_TAG:
164                self._normalized_manifest[DEF_TAG].pop(key, None)
165
166    def _extract_stream_schema(self, stream: Dict[str, Any]) -> None:
167        """
168        Extract the schema from the stream and add it to the `schemas` tag.
169        """
170
171        stream_name = stream["name"]
172        # copy the value of the SCHEMA_TAG to the SCHEMAS_TAG with the stream name as key
173        schema = stream.get(SCHEMA_LOADER_TAG, {}).get(SCHEMA_TAG)
174        if not SCHEMAS_TAG in self._normalized_manifest.keys():
175            self._normalized_manifest[SCHEMAS_TAG] = {}
176        # add stream schema to the SCHEMAS_TAG
177        if not stream_name in self._normalized_manifest[SCHEMAS_TAG].keys():
178            # add the schema to the SCHEMAS_TAG with the stream name as key
179            self._normalized_manifest[SCHEMAS_TAG][stream_name] = schema
180
181    def _reference_schemas(self) -> None:
182        """
183        Set the schema reference for the given stream in the manifest.
184        This function modifies the manifest in place.
185        """
186
187        # reference the stream schema for the stream to where it's stored
188        if SCHEMAS_TAG in self._normalized_manifest.keys():
189            for stream in self._get_manifest_streams():
190                self._extract_stream_schema(stream)
191                self._set_stream_schema_ref(stream)
192
193    def _set_stream_schema_ref(self, stream: Dict[str, Any]) -> None:
194        """
195        Set the schema reference for the given stream in the manifest.
196        This function modifies the manifest in place.
197        """
198        stream_name = stream["name"]
199        if SCHEMAS_TAG in self._normalized_manifest.keys():
200            if stream_name in self._normalized_manifest[SCHEMAS_TAG]:
201                stream[SCHEMA_LOADER_TAG][SCHEMA_TAG] = self._create_schema_ref(stream_name)
202
203    def _replace_duplicates_with_refs(self, duplicates: DuplicatesType) -> None:
204        """
205        Process duplicate objects and replace them with references.
206
207        Args:
208            duplicates: The duplicates dictionary collected from the given manifest.
209        """
210
211        for _, occurrences in duplicates.items():
212            type_key, key, value = self._get_occurance_samples(occurrences)
213            is_linked_def = self._is_linked_definition(type_key, key)
214
215            # Add to definitions if not there already
216            if not is_linked_def:
217                self._add_to_linked_definitions(type_key, key, value)
218
219            # Replace occurrences with references
220            for _, parent_obj, value in occurrences:
221                if is_linked_def:
222                    if value == self._get_linked_definition_value(type_key, key):
223                        parent_obj[key] = self._create_linked_definition_ref(type_key, key)
224                else:
225                    parent_obj[key] = self._create_linked_definition_ref(type_key, key)
226
227    def _handle_duplicates(self, duplicates: DuplicatesType) -> None:
228        """
229        Process the duplicates and replace them with references.
230
231        Args:
232            duplicates: The duplicates dictionary collected from the given manifest.
233        """
234
235        if len(duplicates) > 0:
236            self._replace_duplicates_with_refs(duplicates)
237
238    def _add_duplicate(
239        self,
240        duplicates: DuplicatesType,
241        current_path: List[str],
242        obj: Dict[str, Any],
243        value: Any,
244        key: Optional[str] = None,
245    ) -> None:
246        """
247        Adds a duplicate record of an observed object by computing a unique hash for the provided value.
248
249        This function computes a hash for the given value (or a dictionary composed of the key and value if a key is provided)
250        and appends a tuple containing the current path, the original object, and the value to the duplicates
251        dictionary under the corresponding hash.
252
253        Parameters:
254            duplicates (DuplicatesType): The dictionary to store duplicate records.
255            current_path (List[str]): The list of keys or indices representing the current location in the object hierarchy.
256            obj (Dict): The original dictionary object where the duplicate is observed.
257            value (Any): The value to be hashed and used for identifying duplicates.
258            key (Optional[str]): An optional key that, if provided, wraps the value in a dictionary before hashing.
259        """
260
261        # create hash for each duplicate observed
262        value_to_hash = {key: value} if key is not None else value
263        duplicates[self._hash_object(value_to_hash)].append((current_path, obj, value))
264
265    def _add_to_linked_definitions(
266        self,
267        type_key: str,
268        key: str,
269        value: Any,
270    ) -> None:
271        """
272        Add a value to the linked definitions under the specified key.
273
274        Args:
275            definitions: The definitions dictionary to modify
276            key: The key to use
277            value: The value to add
278        """
279        if type_key not in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys():
280            self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key] = {}
281
282        if key not in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys():
283            self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key][key] = value
284
285    def _collect_duplicates(self) -> DuplicatesType:
286        """
287        Traverse the JSON object and collect all potential duplicate values and objects.
288
289        Returns:
290            duplicates: A dictionary of duplicate objects.
291        """
292
293        def _collect(obj: Dict[str, Any], path: Optional[List[str]] = None) -> None:
294            """
295            The closure to recursively collect duplicates in the JSON object.
296
297            Args:
298                obj: The current object being analyzed.
299                path: The current path in the object hierarchy.
300            """
301
302            if not isinstance(obj, dict):
303                return
304
305            path = [] if path is None else path
306            # Check if the object is empty
307            for key, value in obj.items():
308                # do not collect duplicates from `definitions` tag
309                if key == DEF_TAG:
310                    continue
311
312                current_path = path + [key]
313
314                if isinstance(value, dict):
315                    # First process nested dictionaries
316                    _collect(value, current_path)
317                    # Process allowed-only component tags
318                    if key in self._linkable_tags:
319                        self._add_duplicate(duplicates, current_path, obj, value)
320
321                # handle primitive types
322                elif isinstance(value, (str, int, float, bool)):
323                    # Process allowed-only field tags
324                    if key in self._linkable_tags:
325                        self._add_duplicate(duplicates, current_path, obj, value, key)
326
327                # handle list cases
328                elif isinstance(value, list):
329                    for i, item in enumerate(value):
330                        _collect(item, current_path + [str(i)])
331
332        duplicates: DuplicatesType = defaultdict(list, {})
333        try:
334            if self._linkable_tags:
335                _collect(self._normalized_manifest)
336                # clean non-duplicates and sort based on the count of occurrences
337                return self._clean_and_sort_duplicates(duplicates)
338            return duplicates
339        except Exception as e:
340            raise ManifestNormalizationException(str(e))
341
342    def _clean_and_sort_duplicates(self, duplicates: DuplicatesType) -> DuplicatesType:
343        """
344        Clean non-duplicates and sort the duplicates by their occurrences.
345
346        Args:
347            duplicates: The duplicates dictionary to sort
348
349        Returns:
350            A sorted duplicates dictionary.
351        """
352
353        # clean non-duplicates
354        duplicates = defaultdict(
355            list,
356            {k: v for k, v in duplicates.items() if len(v) >= N_OCCURANCES},
357        )
358
359        # sort the duplicates by their occurrences, more frequent ones go first
360        duplicates = defaultdict(
361            list,
362            {k: v for k, v in sorted(duplicates.items(), key=lambda x: len(x[1]), reverse=True)},
363        )
364
365        return duplicates
366
367    def _hash_object(self, obj: Dict[str, Any]) -> str:
368        """
369        Create a unique hash for a dictionary object.
370
371        Args:
372            node: The dictionary to hash
373
374        Returns:
375            A hashed string
376        """
377
378        # Sort keys to ensure consistent hash for same content
379        return hashlib.md5(json.dumps(obj, sort_keys=True).encode()).hexdigest()
380
381    def _is_linked_definition(self, type_key: str, key: str) -> bool:
382        """
383        Check if the key already exists in the linked definitions.
384
385        Args:
386            key: The key to check
387            definitions: The definitions dictionary with definitions
388
389        Returns:
390            True if the key exists in the linked definitions, False otherwise
391        """
392
393        if type_key in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys():
394            # Check if the key exists in the linked definitions
395            if key in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys():
396                return True
397
398        return False
399
400    def _get_linked_definition_value(self, type_key: str, key: str) -> Any:
401        """
402        Get the value of a linked definition by its key.
403
404        Args:
405            key: The key to check
406            definitions: The definitions dictionary with definitions
407
408        Returns:
409            The value of the linked definition
410        """
411        if type_key in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys():
412            if key in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys():
413                return self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key][key]
414        else:
415            raise ManifestNormalizationException(
416                f"Key {key} not found in linked definitions. Please check the manifest."
417            )
418
419    def _get_occurance_samples(self, occurrences: DuplicateOccurancesType) -> Tuple[str, str, Any]:
420        """
421        Get the key from the occurrences list.
422
423        Args:
424            occurrences: The occurrences list
425
426        Returns:
427            The key, type and value from the occurrences
428        """
429
430        # Take the value from the first occurrence, as they are the same
431        path, obj, value = occurrences[0]
432        return (
433            obj["type"],
434            path[-1],
435            value,
436        )  # Return the component's name as the last part of its path
437
438    def _create_linked_definition_ref(self, type_key: str, key: str) -> Dict[str, str]:
439        """
440        Create a reference object for the linked definitions using the specified key.
441
442        Args:
443            ref_key: The reference key to use
444
445        Returns:
446            A reference object in the proper format
447        """
448
449        return {"$ref": f"#/{DEF_TAG}/{LINKED_TAG}/{type_key}/{key}"}
450
451    def _create_schema_ref(self, key: str) -> Dict[str, str]:
452        """
453        Create a reference object for stream schema using the specified key.
454
455        Args:
456            key: The reference key to use
457
458        Returns:
459            A reference object in the proper format
460        """
461
462        return {"$ref": f"#/{SCHEMAS_TAG}/{key}"}
ManifestType = typing.Dict[str, typing.Any]
DefinitionsType = typing.Dict[str, typing.Any]
DuplicateOccurancesType = typing.List[typing.Tuple[typing.List[str], typing.Dict[str, typing.Any], typing.Dict[str, typing.Any]]]
DuplicatesType = typing.DefaultDict[str, typing.List[typing.Tuple[typing.List[str], typing.Dict[str, typing.Any], typing.Dict[str, typing.Any]]]]
N_OCCURANCES = 2
DEF_TAG = 'definitions'
LINKABLE_TAG = 'linkable'
LINKED_TAG = 'linked'
PROPERTIES_TAG = 'properties'
SCHEMA_LOADER_TAG = 'schema_loader'
SCHEMA_TAG = 'schema'
SCHEMAS_TAG = 'schemas'
STREAMS_TAG = 'streams'
class ManifestNormalizer:
 73class ManifestNormalizer:
 74    """
 75    This class is responsible for normalizing the manifest by appliying processing such as:
 76     - removing duplicated definitions
 77     - replacing them with references.
 78
 79    To extend the functionality, use the `normilize()` method to include any additional processing steps.
 80    """
 81
 82    def __init__(
 83        self,
 84        resolved_manifest: ManifestType,
 85        declarative_schema: DefinitionsType,
 86    ) -> None:
 87        self._resolved_manifest = resolved_manifest
 88        self._declarative_schema = declarative_schema
 89        self._normalized_manifest: ManifestType = copy.deepcopy(self._resolved_manifest)
 90        # get the tags marked as `linkable` in the component schema
 91        self._linkable_tags = _get_linkable_schema_tags(self._declarative_schema)
 92
 93    def to_json_str(self) -> str:
 94        return json.dumps(self._normalized_manifest, indent=2)
 95
 96    def normalize(self) -> ManifestType:
 97        """
 98        Normalizes the manifest by deduplicating and resolving schema references.
 99
100        This method processes the manifest in two steps:
101        1. Deduplicates elements within the manifest
102        2. Resolves and references schemas
103
104        Returns:
105            ManifestType: The normalized manifest if processing succeeds,
106                          or the original resolved manifest if normalization fails.
107
108        Raises:
109            ManifestNormalizationException: Caught internally and handled by returning the original manifest.
110        """
111        try:
112            self._deduplicate_minifest()
113            self._reference_schemas()
114
115            return self._normalized_manifest
116        except ManifestNormalizationException:
117            # if any error occurs, we just return the original manifest.
118            # TODO: enable debug logging
119            return self._resolved_manifest
120
121    def _get_manifest_streams(self) -> Iterable[Dict[str, Any]]:
122        """
123        Get the streams from the manifest.
124
125        Returns:
126            An Iterable of streams.
127        """
128
129        if STREAMS_TAG in self._normalized_manifest.keys():
130            for stream in self._normalized_manifest[STREAMS_TAG]:
131                yield stream
132
133        yield from []
134
135    def _deduplicate_minifest(self) -> None:
136        """
137        Find commonalities in the input JSON structure and refactor it to avoid redundancy.
138        """
139
140        try:
141            # prepare the `definitions` tag
142            self._prepare_definitions()
143            # replace duplicates with references, if any
144            self._handle_duplicates(self._collect_duplicates())
145        except Exception as e:
146            raise ManifestNormalizationException(str(e))
147
148    def _prepare_definitions(self) -> None:
149        """
150        Clean the definitions in the manifest by removing unnecessary properties.
151        This function modifies the manifest in place.
152        """
153
154        # Check if the definitions tag exists
155        if not DEF_TAG in self._normalized_manifest:
156            self._normalized_manifest[DEF_TAG] = {}
157
158        # Check if the linked tag exists
159        if not LINKED_TAG in self._normalized_manifest[DEF_TAG]:
160            self._normalized_manifest[DEF_TAG][LINKED_TAG] = {}
161
162        # remove everything from definitions tag except of `linked`, after processing
163        for key in list(self._normalized_manifest[DEF_TAG].keys()):
164            if key != LINKED_TAG:
165                self._normalized_manifest[DEF_TAG].pop(key, None)
166
167    def _extract_stream_schema(self, stream: Dict[str, Any]) -> None:
168        """
169        Extract the schema from the stream and add it to the `schemas` tag.
170        """
171
172        stream_name = stream["name"]
173        # copy the value of the SCHEMA_TAG to the SCHEMAS_TAG with the stream name as key
174        schema = stream.get(SCHEMA_LOADER_TAG, {}).get(SCHEMA_TAG)
175        if not SCHEMAS_TAG in self._normalized_manifest.keys():
176            self._normalized_manifest[SCHEMAS_TAG] = {}
177        # add stream schema to the SCHEMAS_TAG
178        if not stream_name in self._normalized_manifest[SCHEMAS_TAG].keys():
179            # add the schema to the SCHEMAS_TAG with the stream name as key
180            self._normalized_manifest[SCHEMAS_TAG][stream_name] = schema
181
182    def _reference_schemas(self) -> None:
183        """
184        Set the schema reference for the given stream in the manifest.
185        This function modifies the manifest in place.
186        """
187
188        # reference the stream schema for the stream to where it's stored
189        if SCHEMAS_TAG in self._normalized_manifest.keys():
190            for stream in self._get_manifest_streams():
191                self._extract_stream_schema(stream)
192                self._set_stream_schema_ref(stream)
193
194    def _set_stream_schema_ref(self, stream: Dict[str, Any]) -> None:
195        """
196        Set the schema reference for the given stream in the manifest.
197        This function modifies the manifest in place.
198        """
199        stream_name = stream["name"]
200        if SCHEMAS_TAG in self._normalized_manifest.keys():
201            if stream_name in self._normalized_manifest[SCHEMAS_TAG]:
202                stream[SCHEMA_LOADER_TAG][SCHEMA_TAG] = self._create_schema_ref(stream_name)
203
204    def _replace_duplicates_with_refs(self, duplicates: DuplicatesType) -> None:
205        """
206        Process duplicate objects and replace them with references.
207
208        Args:
209            duplicates: The duplicates dictionary collected from the given manifest.
210        """
211
212        for _, occurrences in duplicates.items():
213            type_key, key, value = self._get_occurance_samples(occurrences)
214            is_linked_def = self._is_linked_definition(type_key, key)
215
216            # Add to definitions if not there already
217            if not is_linked_def:
218                self._add_to_linked_definitions(type_key, key, value)
219
220            # Replace occurrences with references
221            for _, parent_obj, value in occurrences:
222                if is_linked_def:
223                    if value == self._get_linked_definition_value(type_key, key):
224                        parent_obj[key] = self._create_linked_definition_ref(type_key, key)
225                else:
226                    parent_obj[key] = self._create_linked_definition_ref(type_key, key)
227
228    def _handle_duplicates(self, duplicates: DuplicatesType) -> None:
229        """
230        Process the duplicates and replace them with references.
231
232        Args:
233            duplicates: The duplicates dictionary collected from the given manifest.
234        """
235
236        if len(duplicates) > 0:
237            self._replace_duplicates_with_refs(duplicates)
238
239    def _add_duplicate(
240        self,
241        duplicates: DuplicatesType,
242        current_path: List[str],
243        obj: Dict[str, Any],
244        value: Any,
245        key: Optional[str] = None,
246    ) -> None:
247        """
248        Adds a duplicate record of an observed object by computing a unique hash for the provided value.
249
250        This function computes a hash for the given value (or a dictionary composed of the key and value if a key is provided)
251        and appends a tuple containing the current path, the original object, and the value to the duplicates
252        dictionary under the corresponding hash.
253
254        Parameters:
255            duplicates (DuplicatesType): The dictionary to store duplicate records.
256            current_path (List[str]): The list of keys or indices representing the current location in the object hierarchy.
257            obj (Dict): The original dictionary object where the duplicate is observed.
258            value (Any): The value to be hashed and used for identifying duplicates.
259            key (Optional[str]): An optional key that, if provided, wraps the value in a dictionary before hashing.
260        """
261
262        # create hash for each duplicate observed
263        value_to_hash = {key: value} if key is not None else value
264        duplicates[self._hash_object(value_to_hash)].append((current_path, obj, value))
265
266    def _add_to_linked_definitions(
267        self,
268        type_key: str,
269        key: str,
270        value: Any,
271    ) -> None:
272        """
273        Add a value to the linked definitions under the specified key.
274
275        Args:
276            definitions: The definitions dictionary to modify
277            key: The key to use
278            value: The value to add
279        """
280        if type_key not in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys():
281            self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key] = {}
282
283        if key not in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys():
284            self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key][key] = value
285
286    def _collect_duplicates(self) -> DuplicatesType:
287        """
288        Traverse the JSON object and collect all potential duplicate values and objects.
289
290        Returns:
291            duplicates: A dictionary of duplicate objects.
292        """
293
294        def _collect(obj: Dict[str, Any], path: Optional[List[str]] = None) -> None:
295            """
296            The closure to recursively collect duplicates in the JSON object.
297
298            Args:
299                obj: The current object being analyzed.
300                path: The current path in the object hierarchy.
301            """
302
303            if not isinstance(obj, dict):
304                return
305
306            path = [] if path is None else path
307            # Check if the object is empty
308            for key, value in obj.items():
309                # do not collect duplicates from `definitions` tag
310                if key == DEF_TAG:
311                    continue
312
313                current_path = path + [key]
314
315                if isinstance(value, dict):
316                    # First process nested dictionaries
317                    _collect(value, current_path)
318                    # Process allowed-only component tags
319                    if key in self._linkable_tags:
320                        self._add_duplicate(duplicates, current_path, obj, value)
321
322                # handle primitive types
323                elif isinstance(value, (str, int, float, bool)):
324                    # Process allowed-only field tags
325                    if key in self._linkable_tags:
326                        self._add_duplicate(duplicates, current_path, obj, value, key)
327
328                # handle list cases
329                elif isinstance(value, list):
330                    for i, item in enumerate(value):
331                        _collect(item, current_path + [str(i)])
332
333        duplicates: DuplicatesType = defaultdict(list, {})
334        try:
335            if self._linkable_tags:
336                _collect(self._normalized_manifest)
337                # clean non-duplicates and sort based on the count of occurrences
338                return self._clean_and_sort_duplicates(duplicates)
339            return duplicates
340        except Exception as e:
341            raise ManifestNormalizationException(str(e))
342
343    def _clean_and_sort_duplicates(self, duplicates: DuplicatesType) -> DuplicatesType:
344        """
345        Clean non-duplicates and sort the duplicates by their occurrences.
346
347        Args:
348            duplicates: The duplicates dictionary to sort
349
350        Returns:
351            A sorted duplicates dictionary.
352        """
353
354        # clean non-duplicates
355        duplicates = defaultdict(
356            list,
357            {k: v for k, v in duplicates.items() if len(v) >= N_OCCURANCES},
358        )
359
360        # sort the duplicates by their occurrences, more frequent ones go first
361        duplicates = defaultdict(
362            list,
363            {k: v for k, v in sorted(duplicates.items(), key=lambda x: len(x[1]), reverse=True)},
364        )
365
366        return duplicates
367
368    def _hash_object(self, obj: Dict[str, Any]) -> str:
369        """
370        Create a unique hash for a dictionary object.
371
372        Args:
373            node: The dictionary to hash
374
375        Returns:
376            A hashed string
377        """
378
379        # Sort keys to ensure consistent hash for same content
380        return hashlib.md5(json.dumps(obj, sort_keys=True).encode()).hexdigest()
381
382    def _is_linked_definition(self, type_key: str, key: str) -> bool:
383        """
384        Check if the key already exists in the linked definitions.
385
386        Args:
387            key: The key to check
388            definitions: The definitions dictionary with definitions
389
390        Returns:
391            True if the key exists in the linked definitions, False otherwise
392        """
393
394        if type_key in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys():
395            # Check if the key exists in the linked definitions
396            if key in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys():
397                return True
398
399        return False
400
401    def _get_linked_definition_value(self, type_key: str, key: str) -> Any:
402        """
403        Get the value of a linked definition by its key.
404
405        Args:
406            key: The key to check
407            definitions: The definitions dictionary with definitions
408
409        Returns:
410            The value of the linked definition
411        """
412        if type_key in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys():
413            if key in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys():
414                return self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key][key]
415        else:
416            raise ManifestNormalizationException(
417                f"Key {key} not found in linked definitions. Please check the manifest."
418            )
419
420    def _get_occurance_samples(self, occurrences: DuplicateOccurancesType) -> Tuple[str, str, Any]:
421        """
422        Get the key from the occurrences list.
423
424        Args:
425            occurrences: The occurrences list
426
427        Returns:
428            The key, type and value from the occurrences
429        """
430
431        # Take the value from the first occurrence, as they are the same
432        path, obj, value = occurrences[0]
433        return (
434            obj["type"],
435            path[-1],
436            value,
437        )  # Return the component's name as the last part of its path
438
439    def _create_linked_definition_ref(self, type_key: str, key: str) -> Dict[str, str]:
440        """
441        Create a reference object for the linked definitions using the specified key.
442
443        Args:
444            ref_key: The reference key to use
445
446        Returns:
447            A reference object in the proper format
448        """
449
450        return {"$ref": f"#/{DEF_TAG}/{LINKED_TAG}/{type_key}/{key}"}
451
452    def _create_schema_ref(self, key: str) -> Dict[str, str]:
453        """
454        Create a reference object for stream schema using the specified key.
455
456        Args:
457            key: The reference key to use
458
459        Returns:
460            A reference object in the proper format
461        """
462
463        return {"$ref": f"#/{SCHEMAS_TAG}/{key}"}
This class is responsible for normalizing the manifest by appliying processing such as:
  • removing duplicated definitions
  • replacing them with references.

To extend the functionality, use the normilize() method to include any additional processing steps.

ManifestNormalizer( resolved_manifest: Dict[str, Any], declarative_schema: Dict[str, Any])
82    def __init__(
83        self,
84        resolved_manifest: ManifestType,
85        declarative_schema: DefinitionsType,
86    ) -> None:
87        self._resolved_manifest = resolved_manifest
88        self._declarative_schema = declarative_schema
89        self._normalized_manifest: ManifestType = copy.deepcopy(self._resolved_manifest)
90        # get the tags marked as `linkable` in the component schema
91        self._linkable_tags = _get_linkable_schema_tags(self._declarative_schema)
def to_json_str(self) -> str:
93    def to_json_str(self) -> str:
94        return json.dumps(self._normalized_manifest, indent=2)
def normalize(self) -> Dict[str, Any]:
 96    def normalize(self) -> ManifestType:
 97        """
 98        Normalizes the manifest by deduplicating and resolving schema references.
 99
100        This method processes the manifest in two steps:
101        1. Deduplicates elements within the manifest
102        2. Resolves and references schemas
103
104        Returns:
105            ManifestType: The normalized manifest if processing succeeds,
106                          or the original resolved manifest if normalization fails.
107
108        Raises:
109            ManifestNormalizationException: Caught internally and handled by returning the original manifest.
110        """
111        try:
112            self._deduplicate_minifest()
113            self._reference_schemas()
114
115            return self._normalized_manifest
116        except ManifestNormalizationException:
117            # if any error occurs, we just return the original manifest.
118            # TODO: enable debug logging
119            return self._resolved_manifest

Normalizes the manifest by deduplicating and resolving schema references.

This method processes the manifest in two steps:

  1. Deduplicates elements within the manifest
  2. Resolves and references schemas
Returns:

ManifestType: The normalized manifest if processing succeeds, or the original resolved manifest if normalization fails.

Raises:
  • ManifestNormalizationException: Caught internally and handled by returning the original manifest.