airbyte_cdk.sources.declarative.parsers.manifest_component_transformer

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import copy
  6import typing
  7from typing import Any, Mapping
  8
  9PARAMETERS_STR = "$parameters"
 10
 11
 12DEFAULT_MODEL_TYPES: Mapping[str, str] = {
 13    # CompositeErrorHandler
 14    "CompositeErrorHandler.error_handlers": "DefaultErrorHandler",
 15    # CursorPagination
 16    "CursorPagination.decoder": "JsonDecoder",
 17    # DatetimeBasedCursor
 18    "DatetimeBasedCursor.end_datetime": "MinMaxDatetime",
 19    "DatetimeBasedCursor.end_time_option": "RequestOption",
 20    "DatetimeBasedCursor.start_datetime": "MinMaxDatetime",
 21    "DatetimeBasedCursor.start_time_option": "RequestOption",
 22    # CustomIncrementalSync
 23    "CustomIncrementalSync.end_datetime": "MinMaxDatetime",
 24    "CustomIncrementalSync.end_time_option": "RequestOption",
 25    "CustomIncrementalSync.start_datetime": "MinMaxDatetime",
 26    "CustomIncrementalSync.start_time_option": "RequestOption",
 27    # DeclarativeSource
 28    "DeclarativeSource.check": "CheckStream",
 29    "DeclarativeSource.spec": "Spec",
 30    "DeclarativeSource.streams": "DeclarativeStream",
 31    # DeclarativeStream
 32    "DeclarativeStream.retriever": "SimpleRetriever",
 33    "DeclarativeStream.schema_loader": "JsonFileSchemaLoader",
 34    # DynamicDeclarativeStream
 35    "DynamicDeclarativeStream.stream_template": "DeclarativeStream",
 36    "DynamicDeclarativeStream.components_resolver": "ConfigComponentResolver",
 37    # HttpComponentsResolver
 38    "HttpComponentsResolver.retriever": "SimpleRetriever",
 39    "HttpComponentsResolver.components_mapping": "ComponentMappingDefinition",
 40    # ConfigComponentResolver
 41    "ConfigComponentsResolver.stream_config": "StreamConfig",
 42    "ConfigComponentsResolver.components_mapping": "ComponentMappingDefinition",
 43    # DefaultErrorHandler
 44    "DefaultErrorHandler.response_filters": "HttpResponseFilter",
 45    # DefaultPaginator
 46    "DefaultPaginator.decoder": "JsonDecoder",
 47    "DefaultPaginator.page_size_option": "RequestOption",
 48    # DpathExtractor
 49    "DpathExtractor.decoder": "JsonDecoder",
 50    # HttpRequester
 51    "HttpRequester.error_handler": "DefaultErrorHandler",
 52    # ListPartitionRouter
 53    "ListPartitionRouter.request_option": "RequestOption",
 54    # ParentStreamConfig
 55    "ParentStreamConfig.request_option": "RequestOption",
 56    "ParentStreamConfig.stream": "DeclarativeStream",
 57    # RecordSelector
 58    "RecordSelector.extractor": "DpathExtractor",
 59    "RecordSelector.record_filter": "RecordFilter",
 60    # SimpleRetriever
 61    "SimpleRetriever.paginator": "NoPagination",
 62    "SimpleRetriever.record_selector": "RecordSelector",
 63    "SimpleRetriever.requester": "HttpRequester",
 64    # SubstreamPartitionRouter
 65    "SubstreamPartitionRouter.parent_stream_configs": "ParentStreamConfig",
 66    # AddFields
 67    "AddFields.fields": "AddedFieldDefinition",
 68    # CustomPartitionRouter
 69    "CustomPartitionRouter.parent_stream_configs": "ParentStreamConfig",
 70    # DynamicSchemaLoader
 71    "DynamicSchemaLoader.retriever": "SimpleRetriever",
 72    # SchemaTypeIdentifier
 73    "SchemaTypeIdentifier.types_map": "TypesMap",
 74}
 75
 76# We retain a separate registry for custom components to automatically insert the type if it is missing. This is intended to
 77# be a short term fix because once we have migrated, then type and class_name should be requirements for all custom components.
 78CUSTOM_COMPONENTS_MAPPING: Mapping[str, str] = {
 79    "CompositeErrorHandler.backoff_strategies": "CustomBackoffStrategy",
 80    "DeclarativeStream.retriever": "CustomRetriever",
 81    "DeclarativeStream.transformations": "CustomTransformation",
 82    "DefaultErrorHandler.backoff_strategies": "CustomBackoffStrategy",
 83    "DefaultPaginator.pagination_strategy": "CustomPaginationStrategy",
 84    "HttpRequester.authenticator": "CustomAuthenticator",
 85    "HttpRequester.error_handler": "CustomErrorHandler",
 86    "RecordSelector.extractor": "CustomRecordExtractor",
 87    "SimpleRetriever.partition_router": "CustomPartitionRouter",
 88}
 89
 90
 91class ManifestComponentTransformer:
 92    def propagate_types_and_parameters(
 93        self,
 94        parent_field_identifier: str,
 95        declarative_component: Mapping[str, Any],
 96        parent_parameters: Mapping[str, Any],
 97    ) -> Mapping[str, Any]:
 98        """
 99        Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the
100        default component type if it was not already present. The resulting transformed components are a deep copy of the input
101        components, not an in-place transformation.
102
103        :param declarative_component: The current component that is having type and parameters added
104        :param parent_field_identifier: The name of the field of the current component coming from the parent component
105        :param parent_parameters: The parameters set on parent components defined before the current component
106        :return: A deep copy of the transformed component with types and parameters persisted to it
107        """
108        propagated_component = dict(copy.deepcopy(declarative_component))
109        if "type" not in propagated_component:
110            # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to
111            # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration,
112            # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently
113            # have no low-code connectors that use class_name except for custom components.
114            if "class_name" in propagated_component:
115                found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier)
116            else:
117                found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier)
118            if found_type:
119                propagated_component["type"] = found_type
120
121        # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters
122        # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could
123        # be json_schema are not objects but we believe this is not likely in our case because:
124        # * records are Mapping so objects hence SchemaLoader root should be an object
125        # * connection_specification is a Mapping
126        if "type" not in propagated_component or self._is_json_schema_object(propagated_component):
127            return propagated_component
128
129        # Combines parameters defined at the current level with parameters from parent components. Parameters at the current
130        # level take precedence
131        current_parameters = dict(copy.deepcopy(parent_parameters))
132        component_parameters = propagated_component.pop(PARAMETERS_STR, {})
133        current_parameters = {**current_parameters, **component_parameters}
134
135        # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if
136        # both exist
137        for parameter_key, parameter_value in current_parameters.items():
138            propagated_component[parameter_key] = (
139                propagated_component.get(parameter_key) or parameter_value
140            )
141
142        for field_name, field_value in propagated_component.items():
143            if isinstance(field_value, dict):
144                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
145                excluded_parameter = current_parameters.pop(field_name, None)
146                parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}"
147                propagated_component[field_name] = self.propagate_types_and_parameters(
148                    parent_type_field_identifier, field_value, current_parameters
149                )
150                if excluded_parameter:
151                    current_parameters[field_name] = excluded_parameter
152            elif isinstance(field_value, typing.List):
153                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
154                excluded_parameter = current_parameters.pop(field_name, None)
155                for i, element in enumerate(field_value):
156                    if isinstance(element, dict):
157                        parent_type_field_identifier = (
158                            f"{propagated_component.get('type')}.{field_name}"
159                        )
160                        field_value[i] = self.propagate_types_and_parameters(
161                            parent_type_field_identifier, element, current_parameters
162                        )
163                if excluded_parameter:
164                    current_parameters[field_name] = excluded_parameter
165
166        if current_parameters:
167            propagated_component[PARAMETERS_STR] = current_parameters
168        return propagated_component
169
170    @staticmethod
171    def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool:
172        return propagated_component.get("type") == "object"
PARAMETERS_STR = '$parameters'
DEFAULT_MODEL_TYPES: Mapping[str, str] = {'CompositeErrorHandler.error_handlers': 'DefaultErrorHandler', 'CursorPagination.decoder': 'JsonDecoder', 'DatetimeBasedCursor.end_datetime': 'MinMaxDatetime', 'DatetimeBasedCursor.end_time_option': 'RequestOption', 'DatetimeBasedCursor.start_datetime': 'MinMaxDatetime', 'DatetimeBasedCursor.start_time_option': 'RequestOption', 'CustomIncrementalSync.end_datetime': 'MinMaxDatetime', 'CustomIncrementalSync.end_time_option': 'RequestOption', 'CustomIncrementalSync.start_datetime': 'MinMaxDatetime', 'CustomIncrementalSync.start_time_option': 'RequestOption', 'DeclarativeSource.check': 'CheckStream', 'DeclarativeSource.spec': 'Spec', 'DeclarativeSource.streams': 'DeclarativeStream', 'DeclarativeStream.retriever': 'SimpleRetriever', 'DeclarativeStream.schema_loader': 'JsonFileSchemaLoader', 'DynamicDeclarativeStream.stream_template': 'DeclarativeStream', 'DynamicDeclarativeStream.components_resolver': 'ConfigComponentResolver', 'HttpComponentsResolver.retriever': 'SimpleRetriever', 'HttpComponentsResolver.components_mapping': 'ComponentMappingDefinition', 'ConfigComponentsResolver.stream_config': 'StreamConfig', 'ConfigComponentsResolver.components_mapping': 'ComponentMappingDefinition', 'DefaultErrorHandler.response_filters': 'HttpResponseFilter', 'DefaultPaginator.decoder': 'JsonDecoder', 'DefaultPaginator.page_size_option': 'RequestOption', 'DpathExtractor.decoder': 'JsonDecoder', 'HttpRequester.error_handler': 'DefaultErrorHandler', 'ListPartitionRouter.request_option': 'RequestOption', 'ParentStreamConfig.request_option': 'RequestOption', 'ParentStreamConfig.stream': 'DeclarativeStream', 'RecordSelector.extractor': 'DpathExtractor', 'RecordSelector.record_filter': 'RecordFilter', 'SimpleRetriever.paginator': 'NoPagination', 'SimpleRetriever.record_selector': 'RecordSelector', 'SimpleRetriever.requester': 'HttpRequester', 'SubstreamPartitionRouter.parent_stream_configs': 'ParentStreamConfig', 'AddFields.fields': 'AddedFieldDefinition', 'CustomPartitionRouter.parent_stream_configs': 'ParentStreamConfig', 'DynamicSchemaLoader.retriever': 'SimpleRetriever', 'SchemaTypeIdentifier.types_map': 'TypesMap'}
CUSTOM_COMPONENTS_MAPPING: Mapping[str, str] = {'CompositeErrorHandler.backoff_strategies': 'CustomBackoffStrategy', 'DeclarativeStream.retriever': 'CustomRetriever', 'DeclarativeStream.transformations': 'CustomTransformation', 'DefaultErrorHandler.backoff_strategies': 'CustomBackoffStrategy', 'DefaultPaginator.pagination_strategy': 'CustomPaginationStrategy', 'HttpRequester.authenticator': 'CustomAuthenticator', 'HttpRequester.error_handler': 'CustomErrorHandler', 'RecordSelector.extractor': 'CustomRecordExtractor', 'SimpleRetriever.partition_router': 'CustomPartitionRouter'}
class ManifestComponentTransformer:
 92class ManifestComponentTransformer:
 93    def propagate_types_and_parameters(
 94        self,
 95        parent_field_identifier: str,
 96        declarative_component: Mapping[str, Any],
 97        parent_parameters: Mapping[str, Any],
 98    ) -> Mapping[str, Any]:
 99        """
100        Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the
101        default component type if it was not already present. The resulting transformed components are a deep copy of the input
102        components, not an in-place transformation.
103
104        :param declarative_component: The current component that is having type and parameters added
105        :param parent_field_identifier: The name of the field of the current component coming from the parent component
106        :param parent_parameters: The parameters set on parent components defined before the current component
107        :return: A deep copy of the transformed component with types and parameters persisted to it
108        """
109        propagated_component = dict(copy.deepcopy(declarative_component))
110        if "type" not in propagated_component:
111            # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to
112            # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration,
113            # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently
114            # have no low-code connectors that use class_name except for custom components.
115            if "class_name" in propagated_component:
116                found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier)
117            else:
118                found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier)
119            if found_type:
120                propagated_component["type"] = found_type
121
122        # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters
123        # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could
124        # be json_schema are not objects but we believe this is not likely in our case because:
125        # * records are Mapping so objects hence SchemaLoader root should be an object
126        # * connection_specification is a Mapping
127        if "type" not in propagated_component or self._is_json_schema_object(propagated_component):
128            return propagated_component
129
130        # Combines parameters defined at the current level with parameters from parent components. Parameters at the current
131        # level take precedence
132        current_parameters = dict(copy.deepcopy(parent_parameters))
133        component_parameters = propagated_component.pop(PARAMETERS_STR, {})
134        current_parameters = {**current_parameters, **component_parameters}
135
136        # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if
137        # both exist
138        for parameter_key, parameter_value in current_parameters.items():
139            propagated_component[parameter_key] = (
140                propagated_component.get(parameter_key) or parameter_value
141            )
142
143        for field_name, field_value in propagated_component.items():
144            if isinstance(field_value, dict):
145                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
146                excluded_parameter = current_parameters.pop(field_name, None)
147                parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}"
148                propagated_component[field_name] = self.propagate_types_and_parameters(
149                    parent_type_field_identifier, field_value, current_parameters
150                )
151                if excluded_parameter:
152                    current_parameters[field_name] = excluded_parameter
153            elif isinstance(field_value, typing.List):
154                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
155                excluded_parameter = current_parameters.pop(field_name, None)
156                for i, element in enumerate(field_value):
157                    if isinstance(element, dict):
158                        parent_type_field_identifier = (
159                            f"{propagated_component.get('type')}.{field_name}"
160                        )
161                        field_value[i] = self.propagate_types_and_parameters(
162                            parent_type_field_identifier, element, current_parameters
163                        )
164                if excluded_parameter:
165                    current_parameters[field_name] = excluded_parameter
166
167        if current_parameters:
168            propagated_component[PARAMETERS_STR] = current_parameters
169        return propagated_component
170
171    @staticmethod
172    def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool:
173        return propagated_component.get("type") == "object"
def propagate_types_and_parameters( self, parent_field_identifier: str, declarative_component: Mapping[str, Any], parent_parameters: Mapping[str, Any]) -> Mapping[str, Any]:
 93    def propagate_types_and_parameters(
 94        self,
 95        parent_field_identifier: str,
 96        declarative_component: Mapping[str, Any],
 97        parent_parameters: Mapping[str, Any],
 98    ) -> Mapping[str, Any]:
 99        """
100        Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the
101        default component type if it was not already present. The resulting transformed components are a deep copy of the input
102        components, not an in-place transformation.
103
104        :param declarative_component: The current component that is having type and parameters added
105        :param parent_field_identifier: The name of the field of the current component coming from the parent component
106        :param parent_parameters: The parameters set on parent components defined before the current component
107        :return: A deep copy of the transformed component with types and parameters persisted to it
108        """
109        propagated_component = dict(copy.deepcopy(declarative_component))
110        if "type" not in propagated_component:
111            # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to
112            # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration,
113            # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently
114            # have no low-code connectors that use class_name except for custom components.
115            if "class_name" in propagated_component:
116                found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier)
117            else:
118                found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier)
119            if found_type:
120                propagated_component["type"] = found_type
121
122        # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters
123        # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could
124        # be json_schema are not objects but we believe this is not likely in our case because:
125        # * records are Mapping so objects hence SchemaLoader root should be an object
126        # * connection_specification is a Mapping
127        if "type" not in propagated_component or self._is_json_schema_object(propagated_component):
128            return propagated_component
129
130        # Combines parameters defined at the current level with parameters from parent components. Parameters at the current
131        # level take precedence
132        current_parameters = dict(copy.deepcopy(parent_parameters))
133        component_parameters = propagated_component.pop(PARAMETERS_STR, {})
134        current_parameters = {**current_parameters, **component_parameters}
135
136        # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if
137        # both exist
138        for parameter_key, parameter_value in current_parameters.items():
139            propagated_component[parameter_key] = (
140                propagated_component.get(parameter_key) or parameter_value
141            )
142
143        for field_name, field_value in propagated_component.items():
144            if isinstance(field_value, dict):
145                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
146                excluded_parameter = current_parameters.pop(field_name, None)
147                parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}"
148                propagated_component[field_name] = self.propagate_types_and_parameters(
149                    parent_type_field_identifier, field_value, current_parameters
150                )
151                if excluded_parameter:
152                    current_parameters[field_name] = excluded_parameter
153            elif isinstance(field_value, typing.List):
154                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
155                excluded_parameter = current_parameters.pop(field_name, None)
156                for i, element in enumerate(field_value):
157                    if isinstance(element, dict):
158                        parent_type_field_identifier = (
159                            f"{propagated_component.get('type')}.{field_name}"
160                        )
161                        field_value[i] = self.propagate_types_and_parameters(
162                            parent_type_field_identifier, element, current_parameters
163                        )
164                if excluded_parameter:
165                    current_parameters[field_name] = excluded_parameter
166
167        if current_parameters:
168            propagated_component[PARAMETERS_STR] = current_parameters
169        return propagated_component

Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the default component type if it was not already present. The resulting transformed components are a deep copy of the input components, not an in-place transformation.

Parameters
  • declarative_component: The current component that is having type and parameters added
  • parent_field_identifier: The name of the field of the current component coming from the parent component
  • parent_parameters: The parameters set on parent components defined before the current component
Returns

A deep copy of the transformed component with types and parameters persisted to it