airbyte_cdk.sources.declarative.parsers.manifest_component_transformer

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import copy
  6import typing
  7from typing import Any, Dict, Mapping, Optional
  8
  9PARAMETERS_STR = "$parameters"
 10
 11
 12DEFAULT_MODEL_TYPES: Mapping[str, str] = {
 13    # CompositeErrorHandler
 14    "CompositeErrorHandler.error_handlers": "DefaultErrorHandler",
 15    # CursorPagination
 16    "CursorPagination.decoder": "JsonDecoder",
 17    # DatetimeBasedCursor
 18    "DatetimeBasedCursor.end_datetime": "MinMaxDatetime",
 19    "DatetimeBasedCursor.end_time_option": "RequestOption",
 20    "DatetimeBasedCursor.start_datetime": "MinMaxDatetime",
 21    "DatetimeBasedCursor.start_time_option": "RequestOption",
 22    # DeclarativeSource
 23    "DeclarativeSource.check": "CheckStream",
 24    "DeclarativeSource.spec": "Spec",
 25    "DeclarativeSource.streams": "DeclarativeStream",
 26    # DeclarativeStream
 27    "DeclarativeStream.retriever": "SimpleRetriever",
 28    "DeclarativeStream.schema_loader": "JsonFileSchemaLoader",
 29    # DynamicDeclarativeStream
 30    "DynamicDeclarativeStream.stream_template": "DeclarativeStream",
 31    "DynamicDeclarativeStream.components_resolver": "ConfigComponentResolver",
 32    # HttpComponentsResolver
 33    "HttpComponentsResolver.retriever": "SimpleRetriever",
 34    "HttpComponentsResolver.components_mapping": "ComponentMappingDefinition",
 35    # ConfigComponentResolver
 36    "ConfigComponentsResolver.stream_config": "StreamConfig",
 37    "ConfigComponentsResolver.components_mapping": "ComponentMappingDefinition",
 38    # DefaultErrorHandler
 39    "DefaultErrorHandler.response_filters": "HttpResponseFilter",
 40    # DefaultPaginator
 41    "DefaultPaginator.decoder": "JsonDecoder",
 42    "DefaultPaginator.page_size_option": "RequestOption",
 43    # DpathExtractor
 44    "DpathExtractor.decoder": "JsonDecoder",
 45    # HttpRequester
 46    "HttpRequester.error_handler": "DefaultErrorHandler",
 47    # ListPartitionRouter
 48    "ListPartitionRouter.request_option": "RequestOption",
 49    # ParentStreamConfig
 50    "ParentStreamConfig.request_option": "RequestOption",
 51    "ParentStreamConfig.stream": "DeclarativeStream",
 52    # RecordSelector
 53    "RecordSelector.extractor": "DpathExtractor",
 54    "RecordSelector.record_filter": "RecordFilter",
 55    # SimpleRetriever
 56    "SimpleRetriever.paginator": "NoPagination",
 57    "SimpleRetriever.record_selector": "RecordSelector",
 58    "SimpleRetriever.requester": "HttpRequester",
 59    # SubstreamPartitionRouter
 60    "SubstreamPartitionRouter.parent_stream_configs": "ParentStreamConfig",
 61    # AddFields
 62    "AddFields.fields": "AddedFieldDefinition",
 63    # CustomPartitionRouter
 64    "CustomPartitionRouter.parent_stream_configs": "ParentStreamConfig",
 65    # DynamicSchemaLoader
 66    "DynamicSchemaLoader.retriever": "SimpleRetriever",
 67    # SchemaTypeIdentifier
 68    "SchemaTypeIdentifier.types_map": "TypesMap",
 69}
 70
 71# We retain a separate registry for custom components to automatically insert the type if it is missing. This is intended to
 72# be a short term fix because once we have migrated, then type and class_name should be requirements for all custom components.
 73CUSTOM_COMPONENTS_MAPPING: Mapping[str, str] = {
 74    "CompositeErrorHandler.backoff_strategies": "CustomBackoffStrategy",
 75    "DeclarativeStream.retriever": "CustomRetriever",
 76    "DeclarativeStream.transformations": "CustomTransformation",
 77    "DefaultErrorHandler.backoff_strategies": "CustomBackoffStrategy",
 78    "DefaultPaginator.pagination_strategy": "CustomPaginationStrategy",
 79    "HttpRequester.authenticator": "CustomAuthenticator",
 80    "HttpRequester.error_handler": "CustomErrorHandler",
 81    "RecordSelector.extractor": "CustomRecordExtractor",
 82    "SimpleRetriever.partition_router": "CustomPartitionRouter",
 83}
 84
 85
 86class ManifestComponentTransformer:
 87    def propagate_types_and_parameters(
 88        self,
 89        parent_field_identifier: str,
 90        declarative_component: Mapping[str, Any],
 91        parent_parameters: Mapping[str, Any],
 92        use_parent_parameters: Optional[bool] = None,
 93    ) -> Dict[str, Any]:
 94        """
 95        Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the
 96        default component type if it was not already present. The resulting transformed components are a deep copy of the input
 97        components, not an in-place transformation.
 98
 99        :param declarative_component: The current component that is having type and parameters added
100        :param parent_field_identifier: The name of the field of the current component coming from the parent component
101        :param parent_parameters: The parameters set on parent components defined before the current component
102        :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same
103        :return: A deep copy of the transformed component with types and parameters persisted to it
104        """
105        propagated_component = dict(copy.deepcopy(declarative_component))
106        if "type" not in propagated_component:
107            # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to
108            # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration,
109            # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently
110            # have no low-code connectors that use class_name except for custom components.
111            if "class_name" in propagated_component:
112                found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier)
113            else:
114                found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier)
115            if found_type:
116                propagated_component["type"] = found_type
117
118        # Combines parameters defined at the current level with parameters from parent components. Parameters at the current
119        # level take precedence
120        current_parameters = dict(copy.deepcopy(parent_parameters))
121        component_parameters = propagated_component.pop(PARAMETERS_STR, {})
122        current_parameters = (
123            {**component_parameters, **current_parameters}
124            if use_parent_parameters
125            else {**current_parameters, **component_parameters}
126        )
127
128        # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters
129        # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could
130        # be json_schema are not objects but we believe this is not likely in our case because:
131        # * records are Mapping so objects hence SchemaLoader root should be an object
132        # * connection_specification is a Mapping
133        if self._is_json_schema_object(propagated_component):
134            return propagated_component
135
136        # For objects that don't have type check if their object fields have nested components which should have `$parameters` in it.
137        # For example, QueryProperties in requester.request_parameters, etc.
138        # Update propagated_component value with nested components with parent `$parameters` if needed and return propagated_component.
139        if "type" not in propagated_component:
140            if self._has_nested_components(propagated_component):
141                propagated_component = self._process_nested_components(
142                    propagated_component,
143                    parent_field_identifier,
144                    current_parameters,
145                    use_parent_parameters,
146                )
147            return propagated_component
148
149        # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if
150        # both exist
151        for parameter_key, parameter_value in current_parameters.items():
152            propagated_component[parameter_key] = (
153                propagated_component.get(parameter_key) or parameter_value
154            )
155
156        for field_name, field_value in propagated_component.items():
157            if isinstance(field_value, dict):
158                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
159                excluded_parameter = current_parameters.pop(field_name, None)
160                parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}"
161                propagated_component[field_name] = self.propagate_types_and_parameters(
162                    parent_type_field_identifier,
163                    field_value,
164                    current_parameters,
165                    use_parent_parameters=use_parent_parameters,
166                )
167                if excluded_parameter:
168                    current_parameters[field_name] = excluded_parameter
169            elif isinstance(field_value, typing.List):
170                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
171                excluded_parameter = current_parameters.pop(field_name, None)
172                for i, element in enumerate(field_value):
173                    if isinstance(element, dict):
174                        parent_type_field_identifier = (
175                            f"{propagated_component.get('type')}.{field_name}"
176                        )
177                        field_value[i] = self.propagate_types_and_parameters(
178                            parent_type_field_identifier,
179                            element,
180                            current_parameters,
181                            use_parent_parameters=use_parent_parameters,
182                        )
183                if excluded_parameter:
184                    current_parameters[field_name] = excluded_parameter
185
186        if current_parameters:
187            propagated_component[PARAMETERS_STR] = current_parameters
188        return propagated_component
189
190    @staticmethod
191    def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool:
192        return propagated_component.get("type") == "object" or propagated_component.get("type") == [
193            "null",
194            "object",
195        ]
196
197    @staticmethod
198    def _has_nested_components(propagated_component: Dict[str, Any]) -> bool:
199        for k, v in propagated_component.items():
200            if isinstance(v, dict) and v.get("type"):
201                return True
202        return False
203
204    def _process_nested_components(
205        self,
206        propagated_component: Dict[str, Any],
207        parent_field_identifier: str,
208        current_parameters: Mapping[str, Any],
209        use_parent_parameters: Optional[bool] = None,
210    ) -> Dict[str, Any]:
211        for field_name, field_value in propagated_component.items():
212            if isinstance(field_value, dict) and field_value.get("type"):
213                nested_component_with_parameters = self.propagate_types_and_parameters(
214                    parent_field_identifier,
215                    field_value,
216                    current_parameters,
217                    use_parent_parameters=use_parent_parameters,
218                )
219                propagated_component[field_name] = nested_component_with_parameters
220
221        return propagated_component
PARAMETERS_STR = '$parameters'
DEFAULT_MODEL_TYPES: Mapping[str, str] = {'CompositeErrorHandler.error_handlers': 'DefaultErrorHandler', 'CursorPagination.decoder': 'JsonDecoder', 'DatetimeBasedCursor.end_datetime': 'MinMaxDatetime', 'DatetimeBasedCursor.end_time_option': 'RequestOption', 'DatetimeBasedCursor.start_datetime': 'MinMaxDatetime', 'DatetimeBasedCursor.start_time_option': 'RequestOption', 'DeclarativeSource.check': 'CheckStream', 'DeclarativeSource.spec': 'Spec', 'DeclarativeSource.streams': 'DeclarativeStream', 'DeclarativeStream.retriever': 'SimpleRetriever', 'DeclarativeStream.schema_loader': 'JsonFileSchemaLoader', 'DynamicDeclarativeStream.stream_template': 'DeclarativeStream', 'DynamicDeclarativeStream.components_resolver': 'ConfigComponentResolver', 'HttpComponentsResolver.retriever': 'SimpleRetriever', 'HttpComponentsResolver.components_mapping': 'ComponentMappingDefinition', 'ConfigComponentsResolver.stream_config': 'StreamConfig', 'ConfigComponentsResolver.components_mapping': 'ComponentMappingDefinition', 'DefaultErrorHandler.response_filters': 'HttpResponseFilter', 'DefaultPaginator.decoder': 'JsonDecoder', 'DefaultPaginator.page_size_option': 'RequestOption', 'DpathExtractor.decoder': 'JsonDecoder', 'HttpRequester.error_handler': 'DefaultErrorHandler', 'ListPartitionRouter.request_option': 'RequestOption', 'ParentStreamConfig.request_option': 'RequestOption', 'ParentStreamConfig.stream': 'DeclarativeStream', 'RecordSelector.extractor': 'DpathExtractor', 'RecordSelector.record_filter': 'RecordFilter', 'SimpleRetriever.paginator': 'NoPagination', 'SimpleRetriever.record_selector': 'RecordSelector', 'SimpleRetriever.requester': 'HttpRequester', 'SubstreamPartitionRouter.parent_stream_configs': 'ParentStreamConfig', 'AddFields.fields': 'AddedFieldDefinition', 'CustomPartitionRouter.parent_stream_configs': 'ParentStreamConfig', 'DynamicSchemaLoader.retriever': 'SimpleRetriever', 'SchemaTypeIdentifier.types_map': 'TypesMap'}
CUSTOM_COMPONENTS_MAPPING: Mapping[str, str] = {'CompositeErrorHandler.backoff_strategies': 'CustomBackoffStrategy', 'DeclarativeStream.retriever': 'CustomRetriever', 'DeclarativeStream.transformations': 'CustomTransformation', 'DefaultErrorHandler.backoff_strategies': 'CustomBackoffStrategy', 'DefaultPaginator.pagination_strategy': 'CustomPaginationStrategy', 'HttpRequester.authenticator': 'CustomAuthenticator', 'HttpRequester.error_handler': 'CustomErrorHandler', 'RecordSelector.extractor': 'CustomRecordExtractor', 'SimpleRetriever.partition_router': 'CustomPartitionRouter'}
class ManifestComponentTransformer:
 87class ManifestComponentTransformer:
 88    def propagate_types_and_parameters(
 89        self,
 90        parent_field_identifier: str,
 91        declarative_component: Mapping[str, Any],
 92        parent_parameters: Mapping[str, Any],
 93        use_parent_parameters: Optional[bool] = None,
 94    ) -> Dict[str, Any]:
 95        """
 96        Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the
 97        default component type if it was not already present. The resulting transformed components are a deep copy of the input
 98        components, not an in-place transformation.
 99
100        :param declarative_component: The current component that is having type and parameters added
101        :param parent_field_identifier: The name of the field of the current component coming from the parent component
102        :param parent_parameters: The parameters set on parent components defined before the current component
103        :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same
104        :return: A deep copy of the transformed component with types and parameters persisted to it
105        """
106        propagated_component = dict(copy.deepcopy(declarative_component))
107        if "type" not in propagated_component:
108            # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to
109            # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration,
110            # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently
111            # have no low-code connectors that use class_name except for custom components.
112            if "class_name" in propagated_component:
113                found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier)
114            else:
115                found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier)
116            if found_type:
117                propagated_component["type"] = found_type
118
119        # Combines parameters defined at the current level with parameters from parent components. Parameters at the current
120        # level take precedence
121        current_parameters = dict(copy.deepcopy(parent_parameters))
122        component_parameters = propagated_component.pop(PARAMETERS_STR, {})
123        current_parameters = (
124            {**component_parameters, **current_parameters}
125            if use_parent_parameters
126            else {**current_parameters, **component_parameters}
127        )
128
129        # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters
130        # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could
131        # be json_schema are not objects but we believe this is not likely in our case because:
132        # * records are Mapping so objects hence SchemaLoader root should be an object
133        # * connection_specification is a Mapping
134        if self._is_json_schema_object(propagated_component):
135            return propagated_component
136
137        # For objects that don't have type check if their object fields have nested components which should have `$parameters` in it.
138        # For example, QueryProperties in requester.request_parameters, etc.
139        # Update propagated_component value with nested components with parent `$parameters` if needed and return propagated_component.
140        if "type" not in propagated_component:
141            if self._has_nested_components(propagated_component):
142                propagated_component = self._process_nested_components(
143                    propagated_component,
144                    parent_field_identifier,
145                    current_parameters,
146                    use_parent_parameters,
147                )
148            return propagated_component
149
150        # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if
151        # both exist
152        for parameter_key, parameter_value in current_parameters.items():
153            propagated_component[parameter_key] = (
154                propagated_component.get(parameter_key) or parameter_value
155            )
156
157        for field_name, field_value in propagated_component.items():
158            if isinstance(field_value, dict):
159                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
160                excluded_parameter = current_parameters.pop(field_name, None)
161                parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}"
162                propagated_component[field_name] = self.propagate_types_and_parameters(
163                    parent_type_field_identifier,
164                    field_value,
165                    current_parameters,
166                    use_parent_parameters=use_parent_parameters,
167                )
168                if excluded_parameter:
169                    current_parameters[field_name] = excluded_parameter
170            elif isinstance(field_value, typing.List):
171                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
172                excluded_parameter = current_parameters.pop(field_name, None)
173                for i, element in enumerate(field_value):
174                    if isinstance(element, dict):
175                        parent_type_field_identifier = (
176                            f"{propagated_component.get('type')}.{field_name}"
177                        )
178                        field_value[i] = self.propagate_types_and_parameters(
179                            parent_type_field_identifier,
180                            element,
181                            current_parameters,
182                            use_parent_parameters=use_parent_parameters,
183                        )
184                if excluded_parameter:
185                    current_parameters[field_name] = excluded_parameter
186
187        if current_parameters:
188            propagated_component[PARAMETERS_STR] = current_parameters
189        return propagated_component
190
191    @staticmethod
192    def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool:
193        return propagated_component.get("type") == "object" or propagated_component.get("type") == [
194            "null",
195            "object",
196        ]
197
198    @staticmethod
199    def _has_nested_components(propagated_component: Dict[str, Any]) -> bool:
200        for k, v in propagated_component.items():
201            if isinstance(v, dict) and v.get("type"):
202                return True
203        return False
204
205    def _process_nested_components(
206        self,
207        propagated_component: Dict[str, Any],
208        parent_field_identifier: str,
209        current_parameters: Mapping[str, Any],
210        use_parent_parameters: Optional[bool] = None,
211    ) -> Dict[str, Any]:
212        for field_name, field_value in propagated_component.items():
213            if isinstance(field_value, dict) and field_value.get("type"):
214                nested_component_with_parameters = self.propagate_types_and_parameters(
215                    parent_field_identifier,
216                    field_value,
217                    current_parameters,
218                    use_parent_parameters=use_parent_parameters,
219                )
220                propagated_component[field_name] = nested_component_with_parameters
221
222        return propagated_component
def propagate_types_and_parameters( self, parent_field_identifier: str, declarative_component: Mapping[str, Any], parent_parameters: Mapping[str, Any], use_parent_parameters: Optional[bool] = None) -> Dict[str, Any]:
 88    def propagate_types_and_parameters(
 89        self,
 90        parent_field_identifier: str,
 91        declarative_component: Mapping[str, Any],
 92        parent_parameters: Mapping[str, Any],
 93        use_parent_parameters: Optional[bool] = None,
 94    ) -> Dict[str, Any]:
 95        """
 96        Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the
 97        default component type if it was not already present. The resulting transformed components are a deep copy of the input
 98        components, not an in-place transformation.
 99
100        :param declarative_component: The current component that is having type and parameters added
101        :param parent_field_identifier: The name of the field of the current component coming from the parent component
102        :param parent_parameters: The parameters set on parent components defined before the current component
103        :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same
104        :return: A deep copy of the transformed component with types and parameters persisted to it
105        """
106        propagated_component = dict(copy.deepcopy(declarative_component))
107        if "type" not in propagated_component:
108            # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to
109            # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration,
110            # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently
111            # have no low-code connectors that use class_name except for custom components.
112            if "class_name" in propagated_component:
113                found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier)
114            else:
115                found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier)
116            if found_type:
117                propagated_component["type"] = found_type
118
119        # Combines parameters defined at the current level with parameters from parent components. Parameters at the current
120        # level take precedence
121        current_parameters = dict(copy.deepcopy(parent_parameters))
122        component_parameters = propagated_component.pop(PARAMETERS_STR, {})
123        current_parameters = (
124            {**component_parameters, **current_parameters}
125            if use_parent_parameters
126            else {**current_parameters, **component_parameters}
127        )
128
129        # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters
130        # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could
131        # be json_schema are not objects but we believe this is not likely in our case because:
132        # * records are Mapping so objects hence SchemaLoader root should be an object
133        # * connection_specification is a Mapping
134        if self._is_json_schema_object(propagated_component):
135            return propagated_component
136
137        # For objects that don't have type check if their object fields have nested components which should have `$parameters` in it.
138        # For example, QueryProperties in requester.request_parameters, etc.
139        # Update propagated_component value with nested components with parent `$parameters` if needed and return propagated_component.
140        if "type" not in propagated_component:
141            if self._has_nested_components(propagated_component):
142                propagated_component = self._process_nested_components(
143                    propagated_component,
144                    parent_field_identifier,
145                    current_parameters,
146                    use_parent_parameters,
147                )
148            return propagated_component
149
150        # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if
151        # both exist
152        for parameter_key, parameter_value in current_parameters.items():
153            propagated_component[parameter_key] = (
154                propagated_component.get(parameter_key) or parameter_value
155            )
156
157        for field_name, field_value in propagated_component.items():
158            if isinstance(field_value, dict):
159                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
160                excluded_parameter = current_parameters.pop(field_name, None)
161                parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}"
162                propagated_component[field_name] = self.propagate_types_and_parameters(
163                    parent_type_field_identifier,
164                    field_value,
165                    current_parameters,
166                    use_parent_parameters=use_parent_parameters,
167                )
168                if excluded_parameter:
169                    current_parameters[field_name] = excluded_parameter
170            elif isinstance(field_value, typing.List):
171                # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
172                excluded_parameter = current_parameters.pop(field_name, None)
173                for i, element in enumerate(field_value):
174                    if isinstance(element, dict):
175                        parent_type_field_identifier = (
176                            f"{propagated_component.get('type')}.{field_name}"
177                        )
178                        field_value[i] = self.propagate_types_and_parameters(
179                            parent_type_field_identifier,
180                            element,
181                            current_parameters,
182                            use_parent_parameters=use_parent_parameters,
183                        )
184                if excluded_parameter:
185                    current_parameters[field_name] = excluded_parameter
186
187        if current_parameters:
188            propagated_component[PARAMETERS_STR] = current_parameters
189        return propagated_component

Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the default component type if it was not already present. The resulting transformed components are a deep copy of the input components, not an in-place transformation.

Parameters
  • declarative_component: The current component that is having type and parameters added
  • parent_field_identifier: The name of the field of the current component coming from the parent component
  • parent_parameters: The parameters set on parent components defined before the current component
  • use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same
Returns

A deep copy of the transformed component with types and parameters persisted to it