airbyte_cdk.sources.declarative.parsers.manifest_component_transformer

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import copy
  6import typing
  7from typing import Any, Mapping, Optional
  8
  9PARAMETERS_STR = "$parameters"
 10
 11
 12DEFAULT_MODEL_TYPES: Mapping[str, str] = {
 13    # CompositeErrorHandler
 14    "CompositeErrorHandler.error_handlers": "DefaultErrorHandler",
 15    # CursorPagination
 16    "CursorPagination.decoder": "JsonDecoder",
 17    # DatetimeBasedCursor
 18    "DatetimeBasedCursor.end_datetime": "MinMaxDatetime",
 19    "DatetimeBasedCursor.end_time_option": "RequestOption",
 20    "DatetimeBasedCursor.start_datetime": "MinMaxDatetime",
 21    "DatetimeBasedCursor.start_time_option": "RequestOption",
 22    # CustomIncrementalSync
 23    "CustomIncrementalSync.end_datetime": "MinMaxDatetime",
 24    "CustomIncrementalSync.end_time_option": "RequestOption",
 25    "CustomIncrementalSync.start_datetime": "MinMaxDatetime",
 26    "CustomIncrementalSync.start_time_option": "RequestOption",
 27    # DeclarativeSource
 28    "DeclarativeSource.check": "CheckStream",
 29    "DeclarativeSource.spec": "Spec",
 30    "DeclarativeSource.streams": "DeclarativeStream",
 31    # DeclarativeStream
 32    "DeclarativeStream.retriever": "SimpleRetriever",
 33    "DeclarativeStream.schema_loader": "JsonFileSchemaLoader",
 34    # DynamicDeclarativeStream
 35    "DynamicDeclarativeStream.stream_template": "DeclarativeStream",
 36    "DynamicDeclarativeStream.components_resolver": "ConfigComponentResolver",
 37    # HttpComponentsResolver
 38    "HttpComponentsResolver.retriever": "SimpleRetriever",
 39    "HttpComponentsResolver.components_mapping": "ComponentMappingDefinition",
 40    # ConfigComponentResolver
 41    "ConfigComponentsResolver.stream_config": "StreamConfig",
 42    "ConfigComponentsResolver.components_mapping": "ComponentMappingDefinition",
 43    # DefaultErrorHandler
 44    "DefaultErrorHandler.response_filters": "HttpResponseFilter",
 45    # DefaultPaginator
 46    "DefaultPaginator.decoder": "JsonDecoder",
 47    "DefaultPaginator.page_size_option": "RequestOption",
 48    # DpathExtractor
 49    "DpathExtractor.decoder": "JsonDecoder",
 50    # HttpRequester
 51    "HttpRequester.error_handler": "DefaultErrorHandler",
 52    # ListPartitionRouter
 53    "ListPartitionRouter.request_option": "RequestOption",
 54    # ParentStreamConfig
 55    "ParentStreamConfig.request_option": "RequestOption",
 56    "ParentStreamConfig.stream": "DeclarativeStream",
 57    # RecordSelector
 58    "RecordSelector.extractor": "DpathExtractor",
 59    "RecordSelector.record_filter": "RecordFilter",
 60    # SimpleRetriever
 61    "SimpleRetriever.paginator": "NoPagination",
 62    "SimpleRetriever.record_selector": "RecordSelector",
 63    "SimpleRetriever.requester": "HttpRequester",
 64    # SubstreamPartitionRouter
 65    "SubstreamPartitionRouter.parent_stream_configs": "ParentStreamConfig",
 66    # AddFields
 67    "AddFields.fields": "AddedFieldDefinition",
 68    # CustomPartitionRouter
 69    "CustomPartitionRouter.parent_stream_configs": "ParentStreamConfig",
 70    # DynamicSchemaLoader
 71    "DynamicSchemaLoader.retriever": "SimpleRetriever",
 72    # SchemaTypeIdentifier
 73    "SchemaTypeIdentifier.types_map": "TypesMap",
 74}
 75
 76# We retain a separate registry for custom components to automatically insert the type if it is missing. This is intended to
 77# be a short term fix because once we have migrated, then type and class_name should be requirements for all custom components.
 78CUSTOM_COMPONENTS_MAPPING: Mapping[str, str] = {
 79    "CompositeErrorHandler.backoff_strategies": "CustomBackoffStrategy",
 80    "DeclarativeStream.retriever": "CustomRetriever",
 81    "DeclarativeStream.transformations": "CustomTransformation",
 82    "DefaultErrorHandler.backoff_strategies": "CustomBackoffStrategy",
 83    "DefaultPaginator.pagination_strategy": "CustomPaginationStrategy",
 84    "HttpRequester.authenticator": "CustomAuthenticator",
 85    "HttpRequester.error_handler": "CustomErrorHandler",
 86    "RecordSelector.extractor": "CustomRecordExtractor",
 87    "SimpleRetriever.partition_router": "CustomPartitionRouter",
 88}
 89
 90
 91class ManifestComponentTransformer:
 92    def propagate_types_and_parameters(
 93        self,
 94        parent_field_identifier: str,
 95        declarative_component: Mapping[str, Any],
 96        parent_parameters: Mapping[str, Any],
 97        use_parent_parameters: Optional[bool] = None,
 98    ) -> Mapping[str, Any]:
 99        """
100        Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the
101        default component type if it was not already present. The resulting transformed components are a deep copy of the input
102        components, not an in-place transformation.
103
104        :param declarative_component: The current component that is having type and parameters added
105        :param parent_field_identifier: The name of the field of the current component coming from the parent component
106        :param parent_parameters: The parameters set on parent components defined before the current component
107        :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same
108        :return: A deep copy of the transformed component with types and parameters persisted to it
109        """
110        propagated_component = dict(copy.deepcopy(declarative_component))
111        if "type" not in propagated_component:
112            # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to
113            # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration,
114            # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently
115            # have no low-code connectors that use class_name except for custom components.
116            if "class_name" in propagated_component:
117                found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier)
118            else:
119                found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier)
120            if found_type:
121                propagated_component["type"] = found_type
122
123        # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters
124        # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could
125        # be json_schema are not objects but we believe this is not likely in our case because:
126        # * records are Mapping so objects hence SchemaLoader root should be an object
127        # * connection_specification is a Mapping
128        if "type" not in propagated_component or self._is_json_schema_object(propagated_component):
129            return propagated_component
130
131        # Combines parameters defined at the current level with parameters from parent components. Parameters at the current
132        # level take precedence
133        current_parameters = dict(copy.deepcopy(parent_parameters))
134        component_parameters = propagated_component.pop(PARAMETERS_STR, {})
135        current_parameters = (
136            {**component_parameters, **current_parameters}
137            if use_parent_parameters
138            else {**current_parameters, **component_parameters}
139        )
140
141        # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if
142        # both exist
143        for parameter_key, parameter_value in current_parameters.items():
144            propagated_component[parameter_key] = (
145                propagated_component.get(parameter_key) or parameter_value
146            )
147
148        for field_name, field_value in propagated_component.items():
149            if isinstance(field_value, dict):
150                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
151                excluded_parameter = current_parameters.pop(field_name, None)
152                parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}"
153                propagated_component[field_name] = self.propagate_types_and_parameters(
154                    parent_type_field_identifier,
155                    field_value,
156                    current_parameters,
157                    use_parent_parameters=use_parent_parameters,
158                )
159                if excluded_parameter:
160                    current_parameters[field_name] = excluded_parameter
161            elif isinstance(field_value, typing.List):
162                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
163                excluded_parameter = current_parameters.pop(field_name, None)
164                for i, element in enumerate(field_value):
165                    if isinstance(element, dict):
166                        parent_type_field_identifier = (
167                            f"{propagated_component.get('type')}.{field_name}"
168                        )
169                        field_value[i] = self.propagate_types_and_parameters(
170                            parent_type_field_identifier,
171                            element,
172                            current_parameters,
173                            use_parent_parameters=use_parent_parameters,
174                        )
175                if excluded_parameter:
176                    current_parameters[field_name] = excluded_parameter
177
178        if current_parameters:
179            propagated_component[PARAMETERS_STR] = current_parameters
180        return propagated_component
181
182    @staticmethod
183    def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool:
184        return propagated_component.get("type") == "object"
PARAMETERS_STR = '$parameters'
DEFAULT_MODEL_TYPES: Mapping[str, str] = {'CompositeErrorHandler.error_handlers': 'DefaultErrorHandler', 'CursorPagination.decoder': 'JsonDecoder', 'DatetimeBasedCursor.end_datetime': 'MinMaxDatetime', 'DatetimeBasedCursor.end_time_option': 'RequestOption', 'DatetimeBasedCursor.start_datetime': 'MinMaxDatetime', 'DatetimeBasedCursor.start_time_option': 'RequestOption', 'CustomIncrementalSync.end_datetime': 'MinMaxDatetime', 'CustomIncrementalSync.end_time_option': 'RequestOption', 'CustomIncrementalSync.start_datetime': 'MinMaxDatetime', 'CustomIncrementalSync.start_time_option': 'RequestOption', 'DeclarativeSource.check': 'CheckStream', 'DeclarativeSource.spec': 'Spec', 'DeclarativeSource.streams': 'DeclarativeStream', 'DeclarativeStream.retriever': 'SimpleRetriever', 'DeclarativeStream.schema_loader': 'JsonFileSchemaLoader', 'DynamicDeclarativeStream.stream_template': 'DeclarativeStream', 'DynamicDeclarativeStream.components_resolver': 'ConfigComponentResolver', 'HttpComponentsResolver.retriever': 'SimpleRetriever', 'HttpComponentsResolver.components_mapping': 'ComponentMappingDefinition', 'ConfigComponentsResolver.stream_config': 'StreamConfig', 'ConfigComponentsResolver.components_mapping': 'ComponentMappingDefinition', 'DefaultErrorHandler.response_filters': 'HttpResponseFilter', 'DefaultPaginator.decoder': 'JsonDecoder', 'DefaultPaginator.page_size_option': 'RequestOption', 'DpathExtractor.decoder': 'JsonDecoder', 'HttpRequester.error_handler': 'DefaultErrorHandler', 'ListPartitionRouter.request_option': 'RequestOption', 'ParentStreamConfig.request_option': 'RequestOption', 'ParentStreamConfig.stream': 'DeclarativeStream', 'RecordSelector.extractor': 'DpathExtractor', 'RecordSelector.record_filter': 'RecordFilter', 'SimpleRetriever.paginator': 'NoPagination', 'SimpleRetriever.record_selector': 'RecordSelector', 'SimpleRetriever.requester': 'HttpRequester', 'SubstreamPartitionRouter.parent_stream_configs': 'ParentStreamConfig', 'AddFields.fields': 'AddedFieldDefinition', 'CustomPartitionRouter.parent_stream_configs': 'ParentStreamConfig', 'DynamicSchemaLoader.retriever': 'SimpleRetriever', 'SchemaTypeIdentifier.types_map': 'TypesMap'}
CUSTOM_COMPONENTS_MAPPING: Mapping[str, str] = {'CompositeErrorHandler.backoff_strategies': 'CustomBackoffStrategy', 'DeclarativeStream.retriever': 'CustomRetriever', 'DeclarativeStream.transformations': 'CustomTransformation', 'DefaultErrorHandler.backoff_strategies': 'CustomBackoffStrategy', 'DefaultPaginator.pagination_strategy': 'CustomPaginationStrategy', 'HttpRequester.authenticator': 'CustomAuthenticator', 'HttpRequester.error_handler': 'CustomErrorHandler', 'RecordSelector.extractor': 'CustomRecordExtractor', 'SimpleRetriever.partition_router': 'CustomPartitionRouter'}
class ManifestComponentTransformer:
 92class ManifestComponentTransformer:
 93    def propagate_types_and_parameters(
 94        self,
 95        parent_field_identifier: str,
 96        declarative_component: Mapping[str, Any],
 97        parent_parameters: Mapping[str, Any],
 98        use_parent_parameters: Optional[bool] = None,
 99    ) -> Mapping[str, Any]:
100        """
101        Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the
102        default component type if it was not already present. The resulting transformed components are a deep copy of the input
103        components, not an in-place transformation.
104
105        :param declarative_component: The current component that is having type and parameters added
106        :param parent_field_identifier: The name of the field of the current component coming from the parent component
107        :param parent_parameters: The parameters set on parent components defined before the current component
108        :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same
109        :return: A deep copy of the transformed component with types and parameters persisted to it
110        """
111        propagated_component = dict(copy.deepcopy(declarative_component))
112        if "type" not in propagated_component:
113            # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to
114            # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration,
115            # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently
116            # have no low-code connectors that use class_name except for custom components.
117            if "class_name" in propagated_component:
118                found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier)
119            else:
120                found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier)
121            if found_type:
122                propagated_component["type"] = found_type
123
124        # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters
125        # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could
126        # be json_schema are not objects but we believe this is not likely in our case because:
127        # * records are Mapping so objects hence SchemaLoader root should be an object
128        # * connection_specification is a Mapping
129        if "type" not in propagated_component or self._is_json_schema_object(propagated_component):
130            return propagated_component
131
132        # Combines parameters defined at the current level with parameters from parent components. Parameters at the current
133        # level take precedence
134        current_parameters = dict(copy.deepcopy(parent_parameters))
135        component_parameters = propagated_component.pop(PARAMETERS_STR, {})
136        current_parameters = (
137            {**component_parameters, **current_parameters}
138            if use_parent_parameters
139            else {**current_parameters, **component_parameters}
140        )
141
142        # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if
143        # both exist
144        for parameter_key, parameter_value in current_parameters.items():
145            propagated_component[parameter_key] = (
146                propagated_component.get(parameter_key) or parameter_value
147            )
148
149        for field_name, field_value in propagated_component.items():
150            if isinstance(field_value, dict):
151                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
152                excluded_parameter = current_parameters.pop(field_name, None)
153                parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}"
154                propagated_component[field_name] = self.propagate_types_and_parameters(
155                    parent_type_field_identifier,
156                    field_value,
157                    current_parameters,
158                    use_parent_parameters=use_parent_parameters,
159                )
160                if excluded_parameter:
161                    current_parameters[field_name] = excluded_parameter
162            elif isinstance(field_value, typing.List):
163                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
164                excluded_parameter = current_parameters.pop(field_name, None)
165                for i, element in enumerate(field_value):
166                    if isinstance(element, dict):
167                        parent_type_field_identifier = (
168                            f"{propagated_component.get('type')}.{field_name}"
169                        )
170                        field_value[i] = self.propagate_types_and_parameters(
171                            parent_type_field_identifier,
172                            element,
173                            current_parameters,
174                            use_parent_parameters=use_parent_parameters,
175                        )
176                if excluded_parameter:
177                    current_parameters[field_name] = excluded_parameter
178
179        if current_parameters:
180            propagated_component[PARAMETERS_STR] = current_parameters
181        return propagated_component
182
183    @staticmethod
184    def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool:
185        return propagated_component.get("type") == "object"
def propagate_types_and_parameters( self, parent_field_identifier: str, declarative_component: Mapping[str, Any], parent_parameters: Mapping[str, Any], use_parent_parameters: Optional[bool] = None) -> Mapping[str, Any]:
 93    def propagate_types_and_parameters(
 94        self,
 95        parent_field_identifier: str,
 96        declarative_component: Mapping[str, Any],
 97        parent_parameters: Mapping[str, Any],
 98        use_parent_parameters: Optional[bool] = None,
 99    ) -> Mapping[str, Any]:
100        """
101        Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the
102        default component type if it was not already present. The resulting transformed components are a deep copy of the input
103        components, not an in-place transformation.
104
105        :param declarative_component: The current component that is having type and parameters added
106        :param parent_field_identifier: The name of the field of the current component coming from the parent component
107        :param parent_parameters: The parameters set on parent components defined before the current component
108        :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same
109        :return: A deep copy of the transformed component with types and parameters persisted to it
110        """
111        propagated_component = dict(copy.deepcopy(declarative_component))
112        if "type" not in propagated_component:
113            # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to
114            # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration,
115            # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently
116            # have no low-code connectors that use class_name except for custom components.
117            if "class_name" in propagated_component:
118                found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier)
119            else:
120                found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier)
121            if found_type:
122                propagated_component["type"] = found_type
123
124        # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters
125        # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could
126        # be json_schema are not objects but we believe this is not likely in our case because:
127        # * records are Mapping so objects hence SchemaLoader root should be an object
128        # * connection_specification is a Mapping
129        if "type" not in propagated_component or self._is_json_schema_object(propagated_component):
130            return propagated_component
131
132        # Combines parameters defined at the current level with parameters from parent components. Parameters at the current
133        # level take precedence
134        current_parameters = dict(copy.deepcopy(parent_parameters))
135        component_parameters = propagated_component.pop(PARAMETERS_STR, {})
136        current_parameters = (
137            {**component_parameters, **current_parameters}
138            if use_parent_parameters
139            else {**current_parameters, **component_parameters}
140        )
141
142        # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if
143        # both exist
144        for parameter_key, parameter_value in current_parameters.items():
145            propagated_component[parameter_key] = (
146                propagated_component.get(parameter_key) or parameter_value
147            )
148
149        for field_name, field_value in propagated_component.items():
150            if isinstance(field_value, dict):
151                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
152                excluded_parameter = current_parameters.pop(field_name, None)
153                parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}"
154                propagated_component[field_name] = self.propagate_types_and_parameters(
155                    parent_type_field_identifier,
156                    field_value,
157                    current_parameters,
158                    use_parent_parameters=use_parent_parameters,
159                )
160                if excluded_parameter:
161                    current_parameters[field_name] = excluded_parameter
162            elif isinstance(field_value, typing.List):
163                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
164                excluded_parameter = current_parameters.pop(field_name, None)
165                for i, element in enumerate(field_value):
166                    if isinstance(element, dict):
167                        parent_type_field_identifier = (
168                            f"{propagated_component.get('type')}.{field_name}"
169                        )
170                        field_value[i] = self.propagate_types_and_parameters(
171                            parent_type_field_identifier,
172                            element,
173                            current_parameters,
174                            use_parent_parameters=use_parent_parameters,
175                        )
176                if excluded_parameter:
177                    current_parameters[field_name] = excluded_parameter
178
179        if current_parameters:
180            propagated_component[PARAMETERS_STR] = current_parameters
181        return propagated_component

Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the default component type if it was not already present. The resulting transformed components are a deep copy of the input components, not an in-place transformation.

Parameters
  • declarative_component: The current component that is having type and parameters added
  • parent_field_identifier: The name of the field of the current component coming from the parent component
  • parent_parameters: The parameters set on parent components defined before the current component
  • use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same
Returns

A deep copy of the transformed component with types and parameters persisted to it