airbyte_cdk.sources.declarative.parsers.manifest_component_transformer

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import copy
  6import typing
  7from typing import Any, Dict, Mapping, Optional
  8
  9PARAMETERS_STR = "$parameters"
 10
 11
 12DEFAULT_MODEL_TYPES: Mapping[str, str] = {
 13    # CompositeErrorHandler
 14    "CompositeErrorHandler.error_handlers": "DefaultErrorHandler",
 15    # CursorPagination
 16    "CursorPagination.decoder": "JsonDecoder",
 17    # DatetimeBasedCursor
 18    "DatetimeBasedCursor.end_datetime": "MinMaxDatetime",
 19    "DatetimeBasedCursor.end_time_option": "RequestOption",
 20    "DatetimeBasedCursor.start_datetime": "MinMaxDatetime",
 21    "DatetimeBasedCursor.start_time_option": "RequestOption",
 22    # DeclarativeSource
 23    "DeclarativeSource.check": "CheckStream",
 24    "DeclarativeSource.spec": "Spec",
 25    "DeclarativeSource.streams": "DeclarativeStream",
 26    # DeclarativeStream
 27    "DeclarativeStream.retriever": "SimpleRetriever",
 28    "DeclarativeStream.schema_loader": "JsonFileSchemaLoader",
 29    # DynamicDeclarativeStream
 30    "DynamicDeclarativeStream.stream_template": "DeclarativeStream",
 31    "DynamicDeclarativeStream.components_resolver": "ConfigComponentResolver",
 32    # HttpComponentsResolver
 33    "HttpComponentsResolver.retriever": "SimpleRetriever",
 34    "HttpComponentsResolver.components_mapping": "ComponentMappingDefinition",
 35    # ConfigComponentResolver
 36    "ConfigComponentsResolver.stream_config": "StreamConfig",
 37    "ConfigComponentsResolver.components_mapping": "ComponentMappingDefinition",
 38    # DefaultErrorHandler
 39    "DefaultErrorHandler.response_filters": "HttpResponseFilter",
 40    # DefaultPaginator
 41    "DefaultPaginator.decoder": "JsonDecoder",
 42    "DefaultPaginator.page_size_option": "RequestOption",
 43    # DpathExtractor
 44    "DpathExtractor.decoder": "JsonDecoder",
 45    "DpathExtractor.record_expander": "RecordExpander",
 46    # HttpRequester
 47    "HttpRequester.error_handler": "DefaultErrorHandler",
 48    # ListPartitionRouter
 49    "ListPartitionRouter.request_option": "RequestOption",
 50    # ParentStreamConfig
 51    "ParentStreamConfig.request_option": "RequestOption",
 52    "ParentStreamConfig.stream": "DeclarativeStream",
 53    # RecordSelector
 54    "RecordSelector.extractor": "DpathExtractor",
 55    "RecordSelector.record_filter": "RecordFilter",
 56    # SimpleRetriever
 57    "SimpleRetriever.paginator": "NoPagination",
 58    "SimpleRetriever.record_selector": "RecordSelector",
 59    "SimpleRetriever.requester": "HttpRequester",
 60    # SubstreamPartitionRouter
 61    "SubstreamPartitionRouter.parent_stream_configs": "ParentStreamConfig",
 62    # AddFields
 63    "AddFields.fields": "AddedFieldDefinition",
 64    # CustomPartitionRouter
 65    "CustomPartitionRouter.parent_stream_configs": "ParentStreamConfig",
 66    # DynamicSchemaLoader
 67    "DynamicSchemaLoader.retriever": "SimpleRetriever",
 68    # SchemaTypeIdentifier
 69    "SchemaTypeIdentifier.types_map": "TypesMap",
 70}
 71
 72# We retain a separate registry for custom components to automatically insert the type if it is missing. This is intended to
 73# be a short term fix because once we have migrated, then type and class_name should be requirements for all custom components.
 74CUSTOM_COMPONENTS_MAPPING: Mapping[str, str] = {
 75    "CompositeErrorHandler.backoff_strategies": "CustomBackoffStrategy",
 76    "DeclarativeStream.retriever": "CustomRetriever",
 77    "DeclarativeStream.transformations": "CustomTransformation",
 78    "DefaultErrorHandler.backoff_strategies": "CustomBackoffStrategy",
 79    "DefaultPaginator.pagination_strategy": "CustomPaginationStrategy",
 80    "HttpRequester.authenticator": "CustomAuthenticator",
 81    "HttpRequester.error_handler": "CustomErrorHandler",
 82    "RecordSelector.extractor": "CustomRecordExtractor",
 83    "SimpleRetriever.partition_router": "CustomPartitionRouter",
 84}
 85
 86
 87class ManifestComponentTransformer:
 88    def propagate_types_and_parameters(
 89        self,
 90        parent_field_identifier: str,
 91        declarative_component: Mapping[str, Any],
 92        parent_parameters: Mapping[str, Any],
 93        use_parent_parameters: Optional[bool] = None,
 94    ) -> Dict[str, Any]:
 95        """
 96        Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the
 97        default component type if it was not already present. The resulting transformed components are a deep copy of the input
 98        components, not an in-place transformation.
 99
100        :param declarative_component: The current component that is having type and parameters added
101        :param parent_field_identifier: The name of the field of the current component coming from the parent component
102        :param parent_parameters: The parameters set on parent components defined before the current component
103        :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same
104        :return: A deep copy of the transformed component with types and parameters persisted to it
105        """
106        propagated_component = dict(copy.deepcopy(declarative_component))
107        if "type" not in propagated_component:
108            # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to
109            # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration,
110            # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently
111            # have no low-code connectors that use class_name except for custom components.
112            if "class_name" in propagated_component:
113                found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier)
114            else:
115                found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier)
116            if found_type:
117                propagated_component["type"] = found_type
118
119        # Combines parameters defined at the current level with parameters from parent components. Parameters at the current
120        # level take precedence
121        current_parameters = dict(copy.deepcopy(parent_parameters))
122        component_parameters = propagated_component.pop(PARAMETERS_STR, {})
123        current_parameters = (
124            {**component_parameters, **current_parameters}
125            if use_parent_parameters
126            else {**current_parameters, **component_parameters}
127        )
128
129        # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters
130        # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could
131        # be json_schema are not objects but we believe this is not likely in our case because:
132        # * records are Mapping so objects hence SchemaLoader root should be an object
133        # * connection_specification is a Mapping
134        if self._is_json_schema_object(propagated_component):
135            return propagated_component
136
137        # For objects that don't have type check if their object fields have nested components which should have `$parameters` in it.
138        # For example, QueryProperties in requester.request_parameters, etc.
139        # Update propagated_component value with nested components with parent `$parameters` if needed and return propagated_component.
140        if "type" not in propagated_component:
141            if self._has_nested_components(propagated_component):
142                propagated_component = self._process_nested_components(
143                    propagated_component,
144                    parent_field_identifier,
145                    current_parameters,
146                    use_parent_parameters,
147                )
148            return propagated_component
149
150        # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if
151        # both exist
152        for parameter_key, parameter_value in current_parameters.items():
153            propagated_component[parameter_key] = (
154                propagated_component.get(parameter_key) or parameter_value
155            )
156
157        for field_name, field_value in propagated_component.items():
158            if isinstance(field_value, dict):
159                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
160                excluded_parameter = current_parameters.pop(field_name, None)
161                parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}"
162                propagated_component[field_name] = self.propagate_types_and_parameters(
163                    parent_type_field_identifier,
164                    field_value,
165                    current_parameters,
166                    use_parent_parameters=use_parent_parameters,
167                )
168                if excluded_parameter:
169                    current_parameters[field_name] = excluded_parameter
170            elif isinstance(field_value, typing.List):
171                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
172                excluded_parameter = current_parameters.pop(field_name, None)
173                for i, element in enumerate(field_value):
174                    if isinstance(element, dict):
175                        parent_type_field_identifier = (
176                            f"{propagated_component.get('type')}.{field_name}"
177                        )
178                        field_value[i] = self.propagate_types_and_parameters(
179                            parent_type_field_identifier,
180                            element,
181                            current_parameters,
182                            use_parent_parameters=use_parent_parameters,
183                        )
184                if excluded_parameter:
185                    current_parameters[field_name] = excluded_parameter
186
187        if current_parameters:
188            propagated_component[PARAMETERS_STR] = current_parameters
189        return propagated_component
190
191    @staticmethod
192    def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool:
193        return propagated_component.get("type") == "object" or propagated_component.get("type") == [
194            "null",
195            "object",
196        ]
197
198    @staticmethod
199    def _has_nested_components(propagated_component: Dict[str, Any]) -> bool:
200        for k, v in propagated_component.items():
201            if isinstance(v, dict) and v.get("type"):
202                return True
203        return False
204
205    def _process_nested_components(
206        self,
207        propagated_component: Dict[str, Any],
208        parent_field_identifier: str,
209        current_parameters: Mapping[str, Any],
210        use_parent_parameters: Optional[bool] = None,
211    ) -> Dict[str, Any]:
212        for field_name, field_value in propagated_component.items():
213            if isinstance(field_value, dict) and field_value.get("type"):
214                nested_component_with_parameters = self.propagate_types_and_parameters(
215                    parent_field_identifier,
216                    field_value,
217                    current_parameters,
218                    use_parent_parameters=use_parent_parameters,
219                )
220                propagated_component[field_name] = nested_component_with_parameters
221
222        return propagated_component
PARAMETERS_STR = '$parameters'
DEFAULT_MODEL_TYPES: Mapping[str, str] = {'CompositeErrorHandler.error_handlers': 'DefaultErrorHandler', 'CursorPagination.decoder': 'JsonDecoder', 'DatetimeBasedCursor.end_datetime': 'MinMaxDatetime', 'DatetimeBasedCursor.end_time_option': 'RequestOption', 'DatetimeBasedCursor.start_datetime': 'MinMaxDatetime', 'DatetimeBasedCursor.start_time_option': 'RequestOption', 'DeclarativeSource.check': 'CheckStream', 'DeclarativeSource.spec': 'Spec', 'DeclarativeSource.streams': 'DeclarativeStream', 'DeclarativeStream.retriever': 'SimpleRetriever', 'DeclarativeStream.schema_loader': 'JsonFileSchemaLoader', 'DynamicDeclarativeStream.stream_template': 'DeclarativeStream', 'DynamicDeclarativeStream.components_resolver': 'ConfigComponentResolver', 'HttpComponentsResolver.retriever': 'SimpleRetriever', 'HttpComponentsResolver.components_mapping': 'ComponentMappingDefinition', 'ConfigComponentsResolver.stream_config': 'StreamConfig', 'ConfigComponentsResolver.components_mapping': 'ComponentMappingDefinition', 'DefaultErrorHandler.response_filters': 'HttpResponseFilter', 'DefaultPaginator.decoder': 'JsonDecoder', 'DefaultPaginator.page_size_option': 'RequestOption', 'DpathExtractor.decoder': 'JsonDecoder', 'DpathExtractor.record_expander': 'RecordExpander', 'HttpRequester.error_handler': 'DefaultErrorHandler', 'ListPartitionRouter.request_option': 'RequestOption', 'ParentStreamConfig.request_option': 'RequestOption', 'ParentStreamConfig.stream': 'DeclarativeStream', 'RecordSelector.extractor': 'DpathExtractor', 'RecordSelector.record_filter': 'RecordFilter', 'SimpleRetriever.paginator': 'NoPagination', 'SimpleRetriever.record_selector': 'RecordSelector', 'SimpleRetriever.requester': 'HttpRequester', 'SubstreamPartitionRouter.parent_stream_configs': 'ParentStreamConfig', 'AddFields.fields': 'AddedFieldDefinition', 'CustomPartitionRouter.parent_stream_configs': 'ParentStreamConfig', 'DynamicSchemaLoader.retriever': 'SimpleRetriever', 'SchemaTypeIdentifier.types_map': 'TypesMap'}
CUSTOM_COMPONENTS_MAPPING: Mapping[str, str] = {'CompositeErrorHandler.backoff_strategies': 'CustomBackoffStrategy', 'DeclarativeStream.retriever': 'CustomRetriever', 'DeclarativeStream.transformations': 'CustomTransformation', 'DefaultErrorHandler.backoff_strategies': 'CustomBackoffStrategy', 'DefaultPaginator.pagination_strategy': 'CustomPaginationStrategy', 'HttpRequester.authenticator': 'CustomAuthenticator', 'HttpRequester.error_handler': 'CustomErrorHandler', 'RecordSelector.extractor': 'CustomRecordExtractor', 'SimpleRetriever.partition_router': 'CustomPartitionRouter'}
class ManifestComponentTransformer:
 88class ManifestComponentTransformer:
 89    def propagate_types_and_parameters(
 90        self,
 91        parent_field_identifier: str,
 92        declarative_component: Mapping[str, Any],
 93        parent_parameters: Mapping[str, Any],
 94        use_parent_parameters: Optional[bool] = None,
 95    ) -> Dict[str, Any]:
 96        """
 97        Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the
 98        default component type if it was not already present. The resulting transformed components are a deep copy of the input
 99        components, not an in-place transformation.
100
101        :param declarative_component: The current component that is having type and parameters added
102        :param parent_field_identifier: The name of the field of the current component coming from the parent component
103        :param parent_parameters: The parameters set on parent components defined before the current component
104        :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same
105        :return: A deep copy of the transformed component with types and parameters persisted to it
106        """
107        propagated_component = dict(copy.deepcopy(declarative_component))
108        if "type" not in propagated_component:
109            # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to
110            # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration,
111            # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently
112            # have no low-code connectors that use class_name except for custom components.
113            if "class_name" in propagated_component:
114                found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier)
115            else:
116                found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier)
117            if found_type:
118                propagated_component["type"] = found_type
119
120        # Combines parameters defined at the current level with parameters from parent components. Parameters at the current
121        # level take precedence
122        current_parameters = dict(copy.deepcopy(parent_parameters))
123        component_parameters = propagated_component.pop(PARAMETERS_STR, {})
124        current_parameters = (
125            {**component_parameters, **current_parameters}
126            if use_parent_parameters
127            else {**current_parameters, **component_parameters}
128        )
129
130        # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters
131        # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could
132        # be json_schema are not objects but we believe this is not likely in our case because:
133        # * records are Mapping so objects hence SchemaLoader root should be an object
134        # * connection_specification is a Mapping
135        if self._is_json_schema_object(propagated_component):
136            return propagated_component
137
138        # For objects that don't have type check if their object fields have nested components which should have `$parameters` in it.
139        # For example, QueryProperties in requester.request_parameters, etc.
140        # Update propagated_component value with nested components with parent `$parameters` if needed and return propagated_component.
141        if "type" not in propagated_component:
142            if self._has_nested_components(propagated_component):
143                propagated_component = self._process_nested_components(
144                    propagated_component,
145                    parent_field_identifier,
146                    current_parameters,
147                    use_parent_parameters,
148                )
149            return propagated_component
150
151        # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if
152        # both exist
153        for parameter_key, parameter_value in current_parameters.items():
154            propagated_component[parameter_key] = (
155                propagated_component.get(parameter_key) or parameter_value
156            )
157
158        for field_name, field_value in propagated_component.items():
159            if isinstance(field_value, dict):
160                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
161                excluded_parameter = current_parameters.pop(field_name, None)
162                parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}"
163                propagated_component[field_name] = self.propagate_types_and_parameters(
164                    parent_type_field_identifier,
165                    field_value,
166                    current_parameters,
167                    use_parent_parameters=use_parent_parameters,
168                )
169                if excluded_parameter:
170                    current_parameters[field_name] = excluded_parameter
171            elif isinstance(field_value, typing.List):
172                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
173                excluded_parameter = current_parameters.pop(field_name, None)
174                for i, element in enumerate(field_value):
175                    if isinstance(element, dict):
176                        parent_type_field_identifier = (
177                            f"{propagated_component.get('type')}.{field_name}"
178                        )
179                        field_value[i] = self.propagate_types_and_parameters(
180                            parent_type_field_identifier,
181                            element,
182                            current_parameters,
183                            use_parent_parameters=use_parent_parameters,
184                        )
185                if excluded_parameter:
186                    current_parameters[field_name] = excluded_parameter
187
188        if current_parameters:
189            propagated_component[PARAMETERS_STR] = current_parameters
190        return propagated_component
191
192    @staticmethod
193    def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool:
194        return propagated_component.get("type") == "object" or propagated_component.get("type") == [
195            "null",
196            "object",
197        ]
198
199    @staticmethod
200    def _has_nested_components(propagated_component: Dict[str, Any]) -> bool:
201        for k, v in propagated_component.items():
202            if isinstance(v, dict) and v.get("type"):
203                return True
204        return False
205
206    def _process_nested_components(
207        self,
208        propagated_component: Dict[str, Any],
209        parent_field_identifier: str,
210        current_parameters: Mapping[str, Any],
211        use_parent_parameters: Optional[bool] = None,
212    ) -> Dict[str, Any]:
213        for field_name, field_value in propagated_component.items():
214            if isinstance(field_value, dict) and field_value.get("type"):
215                nested_component_with_parameters = self.propagate_types_and_parameters(
216                    parent_field_identifier,
217                    field_value,
218                    current_parameters,
219                    use_parent_parameters=use_parent_parameters,
220                )
221                propagated_component[field_name] = nested_component_with_parameters
222
223        return propagated_component
def propagate_types_and_parameters( self, parent_field_identifier: str, declarative_component: Mapping[str, Any], parent_parameters: Mapping[str, Any], use_parent_parameters: Optional[bool] = None) -> Dict[str, Any]:
 89    def propagate_types_and_parameters(
 90        self,
 91        parent_field_identifier: str,
 92        declarative_component: Mapping[str, Any],
 93        parent_parameters: Mapping[str, Any],
 94        use_parent_parameters: Optional[bool] = None,
 95    ) -> Dict[str, Any]:
 96        """
 97        Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the
 98        default component type if it was not already present. The resulting transformed components are a deep copy of the input
 99        components, not an in-place transformation.
100
101        :param declarative_component: The current component that is having type and parameters added
102        :param parent_field_identifier: The name of the field of the current component coming from the parent component
103        :param parent_parameters: The parameters set on parent components defined before the current component
104        :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same
105        :return: A deep copy of the transformed component with types and parameters persisted to it
106        """
107        propagated_component = dict(copy.deepcopy(declarative_component))
108        if "type" not in propagated_component:
109            # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to
110            # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration,
111            # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently
112            # have no low-code connectors that use class_name except for custom components.
113            if "class_name" in propagated_component:
114                found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier)
115            else:
116                found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier)
117            if found_type:
118                propagated_component["type"] = found_type
119
120        # Combines parameters defined at the current level with parameters from parent components. Parameters at the current
121        # level take precedence
122        current_parameters = dict(copy.deepcopy(parent_parameters))
123        component_parameters = propagated_component.pop(PARAMETERS_STR, {})
124        current_parameters = (
125            {**component_parameters, **current_parameters}
126            if use_parent_parameters
127            else {**current_parameters, **component_parameters}
128        )
129
130        # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters
131        # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could
132        # be json_schema are not objects but we believe this is not likely in our case because:
133        # * records are Mapping so objects hence SchemaLoader root should be an object
134        # * connection_specification is a Mapping
135        if self._is_json_schema_object(propagated_component):
136            return propagated_component
137
138        # For objects that don't have type check if their object fields have nested components which should have `$parameters` in it.
139        # For example, QueryProperties in requester.request_parameters, etc.
140        # Update propagated_component value with nested components with parent `$parameters` if needed and return propagated_component.
141        if "type" not in propagated_component:
142            if self._has_nested_components(propagated_component):
143                propagated_component = self._process_nested_components(
144                    propagated_component,
145                    parent_field_identifier,
146                    current_parameters,
147                    use_parent_parameters,
148                )
149            return propagated_component
150
151        # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if
152        # both exist
153        for parameter_key, parameter_value in current_parameters.items():
154            propagated_component[parameter_key] = (
155                propagated_component.get(parameter_key) or parameter_value
156            )
157
158        for field_name, field_value in propagated_component.items():
159            if isinstance(field_value, dict):
160                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
161                excluded_parameter = current_parameters.pop(field_name, None)
162                parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}"
163                propagated_component[field_name] = self.propagate_types_and_parameters(
164                    parent_type_field_identifier,
165                    field_value,
166                    current_parameters,
167                    use_parent_parameters=use_parent_parameters,
168                )
169                if excluded_parameter:
170                    current_parameters[field_name] = excluded_parameter
171            elif isinstance(field_value, typing.List):
172                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
173                excluded_parameter = current_parameters.pop(field_name, None)
174                for i, element in enumerate(field_value):
175                    if isinstance(element, dict):
176                        parent_type_field_identifier = (
177                            f"{propagated_component.get('type')}.{field_name}"
178                        )
179                        field_value[i] = self.propagate_types_and_parameters(
180                            parent_type_field_identifier,
181                            element,
182                            current_parameters,
183                            use_parent_parameters=use_parent_parameters,
184                        )
185                if excluded_parameter:
186                    current_parameters[field_name] = excluded_parameter
187
188        if current_parameters:
189            propagated_component[PARAMETERS_STR] = current_parameters
190        return propagated_component

Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the default component type if it was not already present. The resulting transformed components are a deep copy of the input components, not an in-place transformation.

Parameters
  • declarative_component: The current component that is having type and parameters added
  • parent_field_identifier: The name of the field of the current component coming from the parent component
  • parent_parameters: The parameters set on parent components defined before the current component
  • use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same
Returns

A deep copy of the transformed component with types and parameters persisted to it