airbyte_cdk.sources.declarative.parsers.manifest_component_transformer
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import copy 6import typing 7from typing import Any, Dict, Mapping, Optional 8 9PARAMETERS_STR = "$parameters" 10 11 12DEFAULT_MODEL_TYPES: Mapping[str, str] = { 13 # CompositeErrorHandler 14 "CompositeErrorHandler.error_handlers": "DefaultErrorHandler", 15 # CursorPagination 16 "CursorPagination.decoder": "JsonDecoder", 17 # DatetimeBasedCursor 18 "DatetimeBasedCursor.end_datetime": "MinMaxDatetime", 19 "DatetimeBasedCursor.end_time_option": "RequestOption", 20 "DatetimeBasedCursor.start_datetime": "MinMaxDatetime", 21 "DatetimeBasedCursor.start_time_option": "RequestOption", 22 # DeclarativeSource 23 "DeclarativeSource.check": "CheckStream", 24 "DeclarativeSource.spec": "Spec", 25 "DeclarativeSource.streams": "DeclarativeStream", 26 # DeclarativeStream 27 "DeclarativeStream.retriever": "SimpleRetriever", 28 "DeclarativeStream.schema_loader": "JsonFileSchemaLoader", 29 # DynamicDeclarativeStream 30 "DynamicDeclarativeStream.stream_template": "DeclarativeStream", 31 "DynamicDeclarativeStream.components_resolver": "ConfigComponentResolver", 32 # HttpComponentsResolver 33 "HttpComponentsResolver.retriever": "SimpleRetriever", 34 "HttpComponentsResolver.components_mapping": "ComponentMappingDefinition", 35 # ConfigComponentResolver 36 "ConfigComponentsResolver.stream_config": "StreamConfig", 37 "ConfigComponentsResolver.components_mapping": "ComponentMappingDefinition", 38 # DefaultErrorHandler 39 "DefaultErrorHandler.response_filters": "HttpResponseFilter", 40 # DefaultPaginator 41 "DefaultPaginator.decoder": "JsonDecoder", 42 "DefaultPaginator.page_size_option": "RequestOption", 43 # DpathExtractor 44 "DpathExtractor.decoder": "JsonDecoder", 45 "DpathExtractor.record_expander": "RecordExpander", 46 # HttpRequester 47 "HttpRequester.error_handler": "DefaultErrorHandler", 48 # ListPartitionRouter 49 "ListPartitionRouter.request_option": "RequestOption", 50 # ParentStreamConfig 51 "ParentStreamConfig.request_option": "RequestOption", 52 "ParentStreamConfig.stream": "DeclarativeStream", 53 # RecordSelector 54 "RecordSelector.extractor": "DpathExtractor", 55 "RecordSelector.record_filter": "RecordFilter", 56 # SimpleRetriever 57 "SimpleRetriever.paginator": "NoPagination", 58 "SimpleRetriever.record_selector": "RecordSelector", 59 "SimpleRetriever.requester": "HttpRequester", 60 # SubstreamPartitionRouter 61 "SubstreamPartitionRouter.parent_stream_configs": "ParentStreamConfig", 62 # AddFields 63 "AddFields.fields": "AddedFieldDefinition", 64 # CustomPartitionRouter 65 "CustomPartitionRouter.parent_stream_configs": "ParentStreamConfig", 66 # DynamicSchemaLoader 67 "DynamicSchemaLoader.retriever": "SimpleRetriever", 68 # SchemaTypeIdentifier 69 "SchemaTypeIdentifier.types_map": "TypesMap", 70} 71 72# We retain a separate registry for custom components to automatically insert the type if it is missing. This is intended to 73# be a short term fix because once we have migrated, then type and class_name should be requirements for all custom components. 74CUSTOM_COMPONENTS_MAPPING: Mapping[str, str] = { 75 "CompositeErrorHandler.backoff_strategies": "CustomBackoffStrategy", 76 "DeclarativeStream.retriever": "CustomRetriever", 77 "DeclarativeStream.transformations": "CustomTransformation", 78 "DefaultErrorHandler.backoff_strategies": "CustomBackoffStrategy", 79 "DefaultPaginator.pagination_strategy": "CustomPaginationStrategy", 80 "HttpRequester.authenticator": "CustomAuthenticator", 81 "HttpRequester.error_handler": "CustomErrorHandler", 82 "RecordSelector.extractor": "CustomRecordExtractor", 83 "SimpleRetriever.partition_router": "CustomPartitionRouter", 84} 85 86 87class ManifestComponentTransformer: 88 def propagate_types_and_parameters( 89 self, 90 parent_field_identifier: str, 91 declarative_component: Mapping[str, Any], 92 parent_parameters: Mapping[str, Any], 93 use_parent_parameters: Optional[bool] = None, 94 ) -> Dict[str, Any]: 95 """ 96 Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the 97 default component type if it was not already present. The resulting transformed components are a deep copy of the input 98 components, not an in-place transformation. 99 100 :param declarative_component: The current component that is having type and parameters added 101 :param parent_field_identifier: The name of the field of the current component coming from the parent component 102 :param parent_parameters: The parameters set on parent components defined before the current component 103 :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same 104 :return: A deep copy of the transformed component with types and parameters persisted to it 105 """ 106 propagated_component = dict(copy.deepcopy(declarative_component)) 107 if "type" not in propagated_component: 108 # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to 109 # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration, 110 # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently 111 # have no low-code connectors that use class_name except for custom components. 112 if "class_name" in propagated_component: 113 found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier) 114 else: 115 found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier) 116 if found_type: 117 propagated_component["type"] = found_type 118 119 # Combines parameters defined at the current level with parameters from parent components. Parameters at the current 120 # level take precedence 121 current_parameters = dict(copy.deepcopy(parent_parameters)) 122 component_parameters = propagated_component.pop(PARAMETERS_STR, {}) 123 current_parameters = ( 124 {**component_parameters, **current_parameters} 125 if use_parent_parameters 126 else {**current_parameters, **component_parameters} 127 ) 128 129 # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters 130 # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could 131 # be json_schema are not objects but we believe this is not likely in our case because: 132 # * records are Mapping so objects hence SchemaLoader root should be an object 133 # * connection_specification is a Mapping 134 if self._is_json_schema_object(propagated_component): 135 return propagated_component 136 137 # For objects that don't have type check if their object fields have nested components which should have `$parameters` in it. 138 # For example, QueryProperties in requester.request_parameters, etc. 139 # Update propagated_component value with nested components with parent `$parameters` if needed and return propagated_component. 140 if "type" not in propagated_component: 141 if self._has_nested_components(propagated_component): 142 propagated_component = self._process_nested_components( 143 propagated_component, 144 parent_field_identifier, 145 current_parameters, 146 use_parent_parameters, 147 ) 148 return propagated_component 149 150 # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if 151 # both exist 152 for parameter_key, parameter_value in current_parameters.items(): 153 propagated_component[parameter_key] = ( 154 propagated_component.get(parameter_key) or parameter_value 155 ) 156 157 for field_name, field_value in propagated_component.items(): 158 if isinstance(field_value, dict): 159 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 160 excluded_parameter = current_parameters.pop(field_name, None) 161 parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}" 162 propagated_component[field_name] = self.propagate_types_and_parameters( 163 parent_type_field_identifier, 164 field_value, 165 current_parameters, 166 use_parent_parameters=use_parent_parameters, 167 ) 168 if excluded_parameter: 169 current_parameters[field_name] = excluded_parameter 170 elif isinstance(field_value, typing.List): 171 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 172 excluded_parameter = current_parameters.pop(field_name, None) 173 for i, element in enumerate(field_value): 174 if isinstance(element, dict): 175 parent_type_field_identifier = ( 176 f"{propagated_component.get('type')}.{field_name}" 177 ) 178 field_value[i] = self.propagate_types_and_parameters( 179 parent_type_field_identifier, 180 element, 181 current_parameters, 182 use_parent_parameters=use_parent_parameters, 183 ) 184 if excluded_parameter: 185 current_parameters[field_name] = excluded_parameter 186 187 if current_parameters: 188 propagated_component[PARAMETERS_STR] = current_parameters 189 return propagated_component 190 191 @staticmethod 192 def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool: 193 return propagated_component.get("type") == "object" or propagated_component.get("type") == [ 194 "null", 195 "object", 196 ] 197 198 @staticmethod 199 def _has_nested_components(propagated_component: Dict[str, Any]) -> bool: 200 for k, v in propagated_component.items(): 201 if isinstance(v, dict) and v.get("type"): 202 return True 203 return False 204 205 def _process_nested_components( 206 self, 207 propagated_component: Dict[str, Any], 208 parent_field_identifier: str, 209 current_parameters: Mapping[str, Any], 210 use_parent_parameters: Optional[bool] = None, 211 ) -> Dict[str, Any]: 212 for field_name, field_value in propagated_component.items(): 213 if isinstance(field_value, dict) and field_value.get("type"): 214 nested_component_with_parameters = self.propagate_types_and_parameters( 215 parent_field_identifier, 216 field_value, 217 current_parameters, 218 use_parent_parameters=use_parent_parameters, 219 ) 220 propagated_component[field_name] = nested_component_with_parameters 221 222 return propagated_component
PARAMETERS_STR =
'$parameters'
DEFAULT_MODEL_TYPES: Mapping[str, str] =
{'CompositeErrorHandler.error_handlers': 'DefaultErrorHandler', 'CursorPagination.decoder': 'JsonDecoder', 'DatetimeBasedCursor.end_datetime': 'MinMaxDatetime', 'DatetimeBasedCursor.end_time_option': 'RequestOption', 'DatetimeBasedCursor.start_datetime': 'MinMaxDatetime', 'DatetimeBasedCursor.start_time_option': 'RequestOption', 'DeclarativeSource.check': 'CheckStream', 'DeclarativeSource.spec': 'Spec', 'DeclarativeSource.streams': 'DeclarativeStream', 'DeclarativeStream.retriever': 'SimpleRetriever', 'DeclarativeStream.schema_loader': 'JsonFileSchemaLoader', 'DynamicDeclarativeStream.stream_template': 'DeclarativeStream', 'DynamicDeclarativeStream.components_resolver': 'ConfigComponentResolver', 'HttpComponentsResolver.retriever': 'SimpleRetriever', 'HttpComponentsResolver.components_mapping': 'ComponentMappingDefinition', 'ConfigComponentsResolver.stream_config': 'StreamConfig', 'ConfigComponentsResolver.components_mapping': 'ComponentMappingDefinition', 'DefaultErrorHandler.response_filters': 'HttpResponseFilter', 'DefaultPaginator.decoder': 'JsonDecoder', 'DefaultPaginator.page_size_option': 'RequestOption', 'DpathExtractor.decoder': 'JsonDecoder', 'DpathExtractor.record_expander': 'RecordExpander', 'HttpRequester.error_handler': 'DefaultErrorHandler', 'ListPartitionRouter.request_option': 'RequestOption', 'ParentStreamConfig.request_option': 'RequestOption', 'ParentStreamConfig.stream': 'DeclarativeStream', 'RecordSelector.extractor': 'DpathExtractor', 'RecordSelector.record_filter': 'RecordFilter', 'SimpleRetriever.paginator': 'NoPagination', 'SimpleRetriever.record_selector': 'RecordSelector', 'SimpleRetriever.requester': 'HttpRequester', 'SubstreamPartitionRouter.parent_stream_configs': 'ParentStreamConfig', 'AddFields.fields': 'AddedFieldDefinition', 'CustomPartitionRouter.parent_stream_configs': 'ParentStreamConfig', 'DynamicSchemaLoader.retriever': 'SimpleRetriever', 'SchemaTypeIdentifier.types_map': 'TypesMap'}
CUSTOM_COMPONENTS_MAPPING: Mapping[str, str] =
{'CompositeErrorHandler.backoff_strategies': 'CustomBackoffStrategy', 'DeclarativeStream.retriever': 'CustomRetriever', 'DeclarativeStream.transformations': 'CustomTransformation', 'DefaultErrorHandler.backoff_strategies': 'CustomBackoffStrategy', 'DefaultPaginator.pagination_strategy': 'CustomPaginationStrategy', 'HttpRequester.authenticator': 'CustomAuthenticator', 'HttpRequester.error_handler': 'CustomErrorHandler', 'RecordSelector.extractor': 'CustomRecordExtractor', 'SimpleRetriever.partition_router': 'CustomPartitionRouter'}
class
ManifestComponentTransformer:
88class ManifestComponentTransformer: 89 def propagate_types_and_parameters( 90 self, 91 parent_field_identifier: str, 92 declarative_component: Mapping[str, Any], 93 parent_parameters: Mapping[str, Any], 94 use_parent_parameters: Optional[bool] = None, 95 ) -> Dict[str, Any]: 96 """ 97 Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the 98 default component type if it was not already present. The resulting transformed components are a deep copy of the input 99 components, not an in-place transformation. 100 101 :param declarative_component: The current component that is having type and parameters added 102 :param parent_field_identifier: The name of the field of the current component coming from the parent component 103 :param parent_parameters: The parameters set on parent components defined before the current component 104 :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same 105 :return: A deep copy of the transformed component with types and parameters persisted to it 106 """ 107 propagated_component = dict(copy.deepcopy(declarative_component)) 108 if "type" not in propagated_component: 109 # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to 110 # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration, 111 # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently 112 # have no low-code connectors that use class_name except for custom components. 113 if "class_name" in propagated_component: 114 found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier) 115 else: 116 found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier) 117 if found_type: 118 propagated_component["type"] = found_type 119 120 # Combines parameters defined at the current level with parameters from parent components. Parameters at the current 121 # level take precedence 122 current_parameters = dict(copy.deepcopy(parent_parameters)) 123 component_parameters = propagated_component.pop(PARAMETERS_STR, {}) 124 current_parameters = ( 125 {**component_parameters, **current_parameters} 126 if use_parent_parameters 127 else {**current_parameters, **component_parameters} 128 ) 129 130 # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters 131 # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could 132 # be json_schema are not objects but we believe this is not likely in our case because: 133 # * records are Mapping so objects hence SchemaLoader root should be an object 134 # * connection_specification is a Mapping 135 if self._is_json_schema_object(propagated_component): 136 return propagated_component 137 138 # For objects that don't have type check if their object fields have nested components which should have `$parameters` in it. 139 # For example, QueryProperties in requester.request_parameters, etc. 140 # Update propagated_component value with nested components with parent `$parameters` if needed and return propagated_component. 141 if "type" not in propagated_component: 142 if self._has_nested_components(propagated_component): 143 propagated_component = self._process_nested_components( 144 propagated_component, 145 parent_field_identifier, 146 current_parameters, 147 use_parent_parameters, 148 ) 149 return propagated_component 150 151 # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if 152 # both exist 153 for parameter_key, parameter_value in current_parameters.items(): 154 propagated_component[parameter_key] = ( 155 propagated_component.get(parameter_key) or parameter_value 156 ) 157 158 for field_name, field_value in propagated_component.items(): 159 if isinstance(field_value, dict): 160 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 161 excluded_parameter = current_parameters.pop(field_name, None) 162 parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}" 163 propagated_component[field_name] = self.propagate_types_and_parameters( 164 parent_type_field_identifier, 165 field_value, 166 current_parameters, 167 use_parent_parameters=use_parent_parameters, 168 ) 169 if excluded_parameter: 170 current_parameters[field_name] = excluded_parameter 171 elif isinstance(field_value, typing.List): 172 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 173 excluded_parameter = current_parameters.pop(field_name, None) 174 for i, element in enumerate(field_value): 175 if isinstance(element, dict): 176 parent_type_field_identifier = ( 177 f"{propagated_component.get('type')}.{field_name}" 178 ) 179 field_value[i] = self.propagate_types_and_parameters( 180 parent_type_field_identifier, 181 element, 182 current_parameters, 183 use_parent_parameters=use_parent_parameters, 184 ) 185 if excluded_parameter: 186 current_parameters[field_name] = excluded_parameter 187 188 if current_parameters: 189 propagated_component[PARAMETERS_STR] = current_parameters 190 return propagated_component 191 192 @staticmethod 193 def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool: 194 return propagated_component.get("type") == "object" or propagated_component.get("type") == [ 195 "null", 196 "object", 197 ] 198 199 @staticmethod 200 def _has_nested_components(propagated_component: Dict[str, Any]) -> bool: 201 for k, v in propagated_component.items(): 202 if isinstance(v, dict) and v.get("type"): 203 return True 204 return False 205 206 def _process_nested_components( 207 self, 208 propagated_component: Dict[str, Any], 209 parent_field_identifier: str, 210 current_parameters: Mapping[str, Any], 211 use_parent_parameters: Optional[bool] = None, 212 ) -> Dict[str, Any]: 213 for field_name, field_value in propagated_component.items(): 214 if isinstance(field_value, dict) and field_value.get("type"): 215 nested_component_with_parameters = self.propagate_types_and_parameters( 216 parent_field_identifier, 217 field_value, 218 current_parameters, 219 use_parent_parameters=use_parent_parameters, 220 ) 221 propagated_component[field_name] = nested_component_with_parameters 222 223 return propagated_component
def
propagate_types_and_parameters( self, parent_field_identifier: str, declarative_component: Mapping[str, Any], parent_parameters: Mapping[str, Any], use_parent_parameters: Optional[bool] = None) -> Dict[str, Any]:
89 def propagate_types_and_parameters( 90 self, 91 parent_field_identifier: str, 92 declarative_component: Mapping[str, Any], 93 parent_parameters: Mapping[str, Any], 94 use_parent_parameters: Optional[bool] = None, 95 ) -> Dict[str, Any]: 96 """ 97 Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the 98 default component type if it was not already present. The resulting transformed components are a deep copy of the input 99 components, not an in-place transformation. 100 101 :param declarative_component: The current component that is having type and parameters added 102 :param parent_field_identifier: The name of the field of the current component coming from the parent component 103 :param parent_parameters: The parameters set on parent components defined before the current component 104 :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same 105 :return: A deep copy of the transformed component with types and parameters persisted to it 106 """ 107 propagated_component = dict(copy.deepcopy(declarative_component)) 108 if "type" not in propagated_component: 109 # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to 110 # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration, 111 # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently 112 # have no low-code connectors that use class_name except for custom components. 113 if "class_name" in propagated_component: 114 found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier) 115 else: 116 found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier) 117 if found_type: 118 propagated_component["type"] = found_type 119 120 # Combines parameters defined at the current level with parameters from parent components. Parameters at the current 121 # level take precedence 122 current_parameters = dict(copy.deepcopy(parent_parameters)) 123 component_parameters = propagated_component.pop(PARAMETERS_STR, {}) 124 current_parameters = ( 125 {**component_parameters, **current_parameters} 126 if use_parent_parameters 127 else {**current_parameters, **component_parameters} 128 ) 129 130 # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters 131 # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could 132 # be json_schema are not objects but we believe this is not likely in our case because: 133 # * records are Mapping so objects hence SchemaLoader root should be an object 134 # * connection_specification is a Mapping 135 if self._is_json_schema_object(propagated_component): 136 return propagated_component 137 138 # For objects that don't have type check if their object fields have nested components which should have `$parameters` in it. 139 # For example, QueryProperties in requester.request_parameters, etc. 140 # Update propagated_component value with nested components with parent `$parameters` if needed and return propagated_component. 141 if "type" not in propagated_component: 142 if self._has_nested_components(propagated_component): 143 propagated_component = self._process_nested_components( 144 propagated_component, 145 parent_field_identifier, 146 current_parameters, 147 use_parent_parameters, 148 ) 149 return propagated_component 150 151 # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if 152 # both exist 153 for parameter_key, parameter_value in current_parameters.items(): 154 propagated_component[parameter_key] = ( 155 propagated_component.get(parameter_key) or parameter_value 156 ) 157 158 for field_name, field_value in propagated_component.items(): 159 if isinstance(field_value, dict): 160 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 161 excluded_parameter = current_parameters.pop(field_name, None) 162 parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}" 163 propagated_component[field_name] = self.propagate_types_and_parameters( 164 parent_type_field_identifier, 165 field_value, 166 current_parameters, 167 use_parent_parameters=use_parent_parameters, 168 ) 169 if excluded_parameter: 170 current_parameters[field_name] = excluded_parameter 171 elif isinstance(field_value, typing.List): 172 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 173 excluded_parameter = current_parameters.pop(field_name, None) 174 for i, element in enumerate(field_value): 175 if isinstance(element, dict): 176 parent_type_field_identifier = ( 177 f"{propagated_component.get('type')}.{field_name}" 178 ) 179 field_value[i] = self.propagate_types_and_parameters( 180 parent_type_field_identifier, 181 element, 182 current_parameters, 183 use_parent_parameters=use_parent_parameters, 184 ) 185 if excluded_parameter: 186 current_parameters[field_name] = excluded_parameter 187 188 if current_parameters: 189 propagated_component[PARAMETERS_STR] = current_parameters 190 return propagated_component
Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the default component type if it was not already present. The resulting transformed components are a deep copy of the input components, not an in-place transformation.
Parameters
- declarative_component: The current component that is having type and parameters added
- parent_field_identifier: The name of the field of the current component coming from the parent component
- parent_parameters: The parameters set on parent components defined before the current component
- use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same
Returns
A deep copy of the transformed component with types and parameters persisted to it