airbyte_cdk.sources.declarative.parsers.manifest_component_transformer
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import copy 6import typing 7from typing import Any, Dict, Mapping, Optional 8 9PARAMETERS_STR = "$parameters" 10 11 12DEFAULT_MODEL_TYPES: Mapping[str, str] = { 13 # CompositeErrorHandler 14 "CompositeErrorHandler.error_handlers": "DefaultErrorHandler", 15 # CursorPagination 16 "CursorPagination.decoder": "JsonDecoder", 17 # DatetimeBasedCursor 18 "DatetimeBasedCursor.end_datetime": "MinMaxDatetime", 19 "DatetimeBasedCursor.end_time_option": "RequestOption", 20 "DatetimeBasedCursor.start_datetime": "MinMaxDatetime", 21 "DatetimeBasedCursor.start_time_option": "RequestOption", 22 # DeclarativeSource 23 "DeclarativeSource.check": "CheckStream", 24 "DeclarativeSource.spec": "Spec", 25 "DeclarativeSource.streams": "DeclarativeStream", 26 # DeclarativeStream 27 "DeclarativeStream.retriever": "SimpleRetriever", 28 "DeclarativeStream.schema_loader": "JsonFileSchemaLoader", 29 # DynamicDeclarativeStream 30 "DynamicDeclarativeStream.stream_template": "DeclarativeStream", 31 "DynamicDeclarativeStream.components_resolver": "ConfigComponentResolver", 32 # HttpComponentsResolver 33 "HttpComponentsResolver.retriever": "SimpleRetriever", 34 "HttpComponentsResolver.components_mapping": "ComponentMappingDefinition", 35 # ConfigComponentResolver 36 "ConfigComponentsResolver.stream_config": "StreamConfig", 37 "ConfigComponentsResolver.components_mapping": "ComponentMappingDefinition", 38 # DefaultErrorHandler 39 "DefaultErrorHandler.response_filters": "HttpResponseFilter", 40 # DefaultPaginator 41 "DefaultPaginator.decoder": "JsonDecoder", 42 "DefaultPaginator.page_size_option": "RequestOption", 43 # DpathExtractor 44 "DpathExtractor.decoder": "JsonDecoder", 45 # HttpRequester 46 "HttpRequester.error_handler": "DefaultErrorHandler", 47 # ListPartitionRouter 48 "ListPartitionRouter.request_option": "RequestOption", 49 # ParentStreamConfig 50 "ParentStreamConfig.request_option": "RequestOption", 51 "ParentStreamConfig.stream": "DeclarativeStream", 52 # RecordSelector 53 "RecordSelector.extractor": "DpathExtractor", 54 "RecordSelector.record_filter": "RecordFilter", 55 # SimpleRetriever 56 "SimpleRetriever.paginator": "NoPagination", 57 "SimpleRetriever.record_selector": "RecordSelector", 58 "SimpleRetriever.requester": "HttpRequester", 59 # SubstreamPartitionRouter 60 "SubstreamPartitionRouter.parent_stream_configs": "ParentStreamConfig", 61 # AddFields 62 "AddFields.fields": "AddedFieldDefinition", 63 # CustomPartitionRouter 64 "CustomPartitionRouter.parent_stream_configs": "ParentStreamConfig", 65 # DynamicSchemaLoader 66 "DynamicSchemaLoader.retriever": "SimpleRetriever", 67 # SchemaTypeIdentifier 68 "SchemaTypeIdentifier.types_map": "TypesMap", 69} 70 71# We retain a separate registry for custom components to automatically insert the type if it is missing. This is intended to 72# be a short term fix because once we have migrated, then type and class_name should be requirements for all custom components. 73CUSTOM_COMPONENTS_MAPPING: Mapping[str, str] = { 74 "CompositeErrorHandler.backoff_strategies": "CustomBackoffStrategy", 75 "DeclarativeStream.retriever": "CustomRetriever", 76 "DeclarativeStream.transformations": "CustomTransformation", 77 "DefaultErrorHandler.backoff_strategies": "CustomBackoffStrategy", 78 "DefaultPaginator.pagination_strategy": "CustomPaginationStrategy", 79 "HttpRequester.authenticator": "CustomAuthenticator", 80 "HttpRequester.error_handler": "CustomErrorHandler", 81 "RecordSelector.extractor": "CustomRecordExtractor", 82 "SimpleRetriever.partition_router": "CustomPartitionRouter", 83} 84 85 86class ManifestComponentTransformer: 87 def propagate_types_and_parameters( 88 self, 89 parent_field_identifier: str, 90 declarative_component: Mapping[str, Any], 91 parent_parameters: Mapping[str, Any], 92 use_parent_parameters: Optional[bool] = None, 93 ) -> Dict[str, Any]: 94 """ 95 Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the 96 default component type if it was not already present. The resulting transformed components are a deep copy of the input 97 components, not an in-place transformation. 98 99 :param declarative_component: The current component that is having type and parameters added 100 :param parent_field_identifier: The name of the field of the current component coming from the parent component 101 :param parent_parameters: The parameters set on parent components defined before the current component 102 :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same 103 :return: A deep copy of the transformed component with types and parameters persisted to it 104 """ 105 propagated_component = dict(copy.deepcopy(declarative_component)) 106 if "type" not in propagated_component: 107 # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to 108 # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration, 109 # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently 110 # have no low-code connectors that use class_name except for custom components. 111 if "class_name" in propagated_component: 112 found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier) 113 else: 114 found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier) 115 if found_type: 116 propagated_component["type"] = found_type 117 118 # Combines parameters defined at the current level with parameters from parent components. Parameters at the current 119 # level take precedence 120 current_parameters = dict(copy.deepcopy(parent_parameters)) 121 component_parameters = propagated_component.pop(PARAMETERS_STR, {}) 122 current_parameters = ( 123 {**component_parameters, **current_parameters} 124 if use_parent_parameters 125 else {**current_parameters, **component_parameters} 126 ) 127 128 # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters 129 # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could 130 # be json_schema are not objects but we believe this is not likely in our case because: 131 # * records are Mapping so objects hence SchemaLoader root should be an object 132 # * connection_specification is a Mapping 133 if self._is_json_schema_object(propagated_component): 134 return propagated_component 135 136 # For objects that don't have type check if their object fields have nested components which should have `$parameters` in it. 137 # For example, QueryProperties in requester.request_parameters, etc. 138 # Update propagated_component value with nested components with parent `$parameters` if needed and return propagated_component. 139 if "type" not in propagated_component: 140 if self._has_nested_components(propagated_component): 141 propagated_component = self._process_nested_components( 142 propagated_component, 143 parent_field_identifier, 144 current_parameters, 145 use_parent_parameters, 146 ) 147 return propagated_component 148 149 # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if 150 # both exist 151 for parameter_key, parameter_value in current_parameters.items(): 152 propagated_component[parameter_key] = ( 153 propagated_component.get(parameter_key) or parameter_value 154 ) 155 156 for field_name, field_value in propagated_component.items(): 157 if isinstance(field_value, dict): 158 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 159 excluded_parameter = current_parameters.pop(field_name, None) 160 parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}" 161 propagated_component[field_name] = self.propagate_types_and_parameters( 162 parent_type_field_identifier, 163 field_value, 164 current_parameters, 165 use_parent_parameters=use_parent_parameters, 166 ) 167 if excluded_parameter: 168 current_parameters[field_name] = excluded_parameter 169 elif isinstance(field_value, typing.List): 170 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 171 excluded_parameter = current_parameters.pop(field_name, None) 172 for i, element in enumerate(field_value): 173 if isinstance(element, dict): 174 parent_type_field_identifier = ( 175 f"{propagated_component.get('type')}.{field_name}" 176 ) 177 field_value[i] = self.propagate_types_and_parameters( 178 parent_type_field_identifier, 179 element, 180 current_parameters, 181 use_parent_parameters=use_parent_parameters, 182 ) 183 if excluded_parameter: 184 current_parameters[field_name] = excluded_parameter 185 186 if current_parameters: 187 propagated_component[PARAMETERS_STR] = current_parameters 188 return propagated_component 189 190 @staticmethod 191 def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool: 192 return propagated_component.get("type") == "object" or propagated_component.get("type") == [ 193 "null", 194 "object", 195 ] 196 197 @staticmethod 198 def _has_nested_components(propagated_component: Dict[str, Any]) -> bool: 199 for k, v in propagated_component.items(): 200 if isinstance(v, dict) and v.get("type"): 201 return True 202 return False 203 204 def _process_nested_components( 205 self, 206 propagated_component: Dict[str, Any], 207 parent_field_identifier: str, 208 current_parameters: Mapping[str, Any], 209 use_parent_parameters: Optional[bool] = None, 210 ) -> Dict[str, Any]: 211 for field_name, field_value in propagated_component.items(): 212 if isinstance(field_value, dict) and field_value.get("type"): 213 nested_component_with_parameters = self.propagate_types_and_parameters( 214 parent_field_identifier, 215 field_value, 216 current_parameters, 217 use_parent_parameters=use_parent_parameters, 218 ) 219 propagated_component[field_name] = nested_component_with_parameters 220 221 return propagated_component
PARAMETERS_STR =
'$parameters'
DEFAULT_MODEL_TYPES: Mapping[str, str] =
{'CompositeErrorHandler.error_handlers': 'DefaultErrorHandler', 'CursorPagination.decoder': 'JsonDecoder', 'DatetimeBasedCursor.end_datetime': 'MinMaxDatetime', 'DatetimeBasedCursor.end_time_option': 'RequestOption', 'DatetimeBasedCursor.start_datetime': 'MinMaxDatetime', 'DatetimeBasedCursor.start_time_option': 'RequestOption', 'DeclarativeSource.check': 'CheckStream', 'DeclarativeSource.spec': 'Spec', 'DeclarativeSource.streams': 'DeclarativeStream', 'DeclarativeStream.retriever': 'SimpleRetriever', 'DeclarativeStream.schema_loader': 'JsonFileSchemaLoader', 'DynamicDeclarativeStream.stream_template': 'DeclarativeStream', 'DynamicDeclarativeStream.components_resolver': 'ConfigComponentResolver', 'HttpComponentsResolver.retriever': 'SimpleRetriever', 'HttpComponentsResolver.components_mapping': 'ComponentMappingDefinition', 'ConfigComponentsResolver.stream_config': 'StreamConfig', 'ConfigComponentsResolver.components_mapping': 'ComponentMappingDefinition', 'DefaultErrorHandler.response_filters': 'HttpResponseFilter', 'DefaultPaginator.decoder': 'JsonDecoder', 'DefaultPaginator.page_size_option': 'RequestOption', 'DpathExtractor.decoder': 'JsonDecoder', 'HttpRequester.error_handler': 'DefaultErrorHandler', 'ListPartitionRouter.request_option': 'RequestOption', 'ParentStreamConfig.request_option': 'RequestOption', 'ParentStreamConfig.stream': 'DeclarativeStream', 'RecordSelector.extractor': 'DpathExtractor', 'RecordSelector.record_filter': 'RecordFilter', 'SimpleRetriever.paginator': 'NoPagination', 'SimpleRetriever.record_selector': 'RecordSelector', 'SimpleRetriever.requester': 'HttpRequester', 'SubstreamPartitionRouter.parent_stream_configs': 'ParentStreamConfig', 'AddFields.fields': 'AddedFieldDefinition', 'CustomPartitionRouter.parent_stream_configs': 'ParentStreamConfig', 'DynamicSchemaLoader.retriever': 'SimpleRetriever', 'SchemaTypeIdentifier.types_map': 'TypesMap'}
CUSTOM_COMPONENTS_MAPPING: Mapping[str, str] =
{'CompositeErrorHandler.backoff_strategies': 'CustomBackoffStrategy', 'DeclarativeStream.retriever': 'CustomRetriever', 'DeclarativeStream.transformations': 'CustomTransformation', 'DefaultErrorHandler.backoff_strategies': 'CustomBackoffStrategy', 'DefaultPaginator.pagination_strategy': 'CustomPaginationStrategy', 'HttpRequester.authenticator': 'CustomAuthenticator', 'HttpRequester.error_handler': 'CustomErrorHandler', 'RecordSelector.extractor': 'CustomRecordExtractor', 'SimpleRetriever.partition_router': 'CustomPartitionRouter'}
class
ManifestComponentTransformer:
87class ManifestComponentTransformer: 88 def propagate_types_and_parameters( 89 self, 90 parent_field_identifier: str, 91 declarative_component: Mapping[str, Any], 92 parent_parameters: Mapping[str, Any], 93 use_parent_parameters: Optional[bool] = None, 94 ) -> Dict[str, Any]: 95 """ 96 Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the 97 default component type if it was not already present. The resulting transformed components are a deep copy of the input 98 components, not an in-place transformation. 99 100 :param declarative_component: The current component that is having type and parameters added 101 :param parent_field_identifier: The name of the field of the current component coming from the parent component 102 :param parent_parameters: The parameters set on parent components defined before the current component 103 :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same 104 :return: A deep copy of the transformed component with types and parameters persisted to it 105 """ 106 propagated_component = dict(copy.deepcopy(declarative_component)) 107 if "type" not in propagated_component: 108 # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to 109 # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration, 110 # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently 111 # have no low-code connectors that use class_name except for custom components. 112 if "class_name" in propagated_component: 113 found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier) 114 else: 115 found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier) 116 if found_type: 117 propagated_component["type"] = found_type 118 119 # Combines parameters defined at the current level with parameters from parent components. Parameters at the current 120 # level take precedence 121 current_parameters = dict(copy.deepcopy(parent_parameters)) 122 component_parameters = propagated_component.pop(PARAMETERS_STR, {}) 123 current_parameters = ( 124 {**component_parameters, **current_parameters} 125 if use_parent_parameters 126 else {**current_parameters, **component_parameters} 127 ) 128 129 # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters 130 # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could 131 # be json_schema are not objects but we believe this is not likely in our case because: 132 # * records are Mapping so objects hence SchemaLoader root should be an object 133 # * connection_specification is a Mapping 134 if self._is_json_schema_object(propagated_component): 135 return propagated_component 136 137 # For objects that don't have type check if their object fields have nested components which should have `$parameters` in it. 138 # For example, QueryProperties in requester.request_parameters, etc. 139 # Update propagated_component value with nested components with parent `$parameters` if needed and return propagated_component. 140 if "type" not in propagated_component: 141 if self._has_nested_components(propagated_component): 142 propagated_component = self._process_nested_components( 143 propagated_component, 144 parent_field_identifier, 145 current_parameters, 146 use_parent_parameters, 147 ) 148 return propagated_component 149 150 # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if 151 # both exist 152 for parameter_key, parameter_value in current_parameters.items(): 153 propagated_component[parameter_key] = ( 154 propagated_component.get(parameter_key) or parameter_value 155 ) 156 157 for field_name, field_value in propagated_component.items(): 158 if isinstance(field_value, dict): 159 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 160 excluded_parameter = current_parameters.pop(field_name, None) 161 parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}" 162 propagated_component[field_name] = self.propagate_types_and_parameters( 163 parent_type_field_identifier, 164 field_value, 165 current_parameters, 166 use_parent_parameters=use_parent_parameters, 167 ) 168 if excluded_parameter: 169 current_parameters[field_name] = excluded_parameter 170 elif isinstance(field_value, typing.List): 171 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 172 excluded_parameter = current_parameters.pop(field_name, None) 173 for i, element in enumerate(field_value): 174 if isinstance(element, dict): 175 parent_type_field_identifier = ( 176 f"{propagated_component.get('type')}.{field_name}" 177 ) 178 field_value[i] = self.propagate_types_and_parameters( 179 parent_type_field_identifier, 180 element, 181 current_parameters, 182 use_parent_parameters=use_parent_parameters, 183 ) 184 if excluded_parameter: 185 current_parameters[field_name] = excluded_parameter 186 187 if current_parameters: 188 propagated_component[PARAMETERS_STR] = current_parameters 189 return propagated_component 190 191 @staticmethod 192 def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool: 193 return propagated_component.get("type") == "object" or propagated_component.get("type") == [ 194 "null", 195 "object", 196 ] 197 198 @staticmethod 199 def _has_nested_components(propagated_component: Dict[str, Any]) -> bool: 200 for k, v in propagated_component.items(): 201 if isinstance(v, dict) and v.get("type"): 202 return True 203 return False 204 205 def _process_nested_components( 206 self, 207 propagated_component: Dict[str, Any], 208 parent_field_identifier: str, 209 current_parameters: Mapping[str, Any], 210 use_parent_parameters: Optional[bool] = None, 211 ) -> Dict[str, Any]: 212 for field_name, field_value in propagated_component.items(): 213 if isinstance(field_value, dict) and field_value.get("type"): 214 nested_component_with_parameters = self.propagate_types_and_parameters( 215 parent_field_identifier, 216 field_value, 217 current_parameters, 218 use_parent_parameters=use_parent_parameters, 219 ) 220 propagated_component[field_name] = nested_component_with_parameters 221 222 return propagated_component
def
propagate_types_and_parameters( self, parent_field_identifier: str, declarative_component: Mapping[str, Any], parent_parameters: Mapping[str, Any], use_parent_parameters: Optional[bool] = None) -> Dict[str, Any]:
88 def propagate_types_and_parameters( 89 self, 90 parent_field_identifier: str, 91 declarative_component: Mapping[str, Any], 92 parent_parameters: Mapping[str, Any], 93 use_parent_parameters: Optional[bool] = None, 94 ) -> Dict[str, Any]: 95 """ 96 Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the 97 default component type if it was not already present. The resulting transformed components are a deep copy of the input 98 components, not an in-place transformation. 99 100 :param declarative_component: The current component that is having type and parameters added 101 :param parent_field_identifier: The name of the field of the current component coming from the parent component 102 :param parent_parameters: The parameters set on parent components defined before the current component 103 :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same 104 :return: A deep copy of the transformed component with types and parameters persisted to it 105 """ 106 propagated_component = dict(copy.deepcopy(declarative_component)) 107 if "type" not in propagated_component: 108 # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to 109 # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration, 110 # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently 111 # have no low-code connectors that use class_name except for custom components. 112 if "class_name" in propagated_component: 113 found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier) 114 else: 115 found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier) 116 if found_type: 117 propagated_component["type"] = found_type 118 119 # Combines parameters defined at the current level with parameters from parent components. Parameters at the current 120 # level take precedence 121 current_parameters = dict(copy.deepcopy(parent_parameters)) 122 component_parameters = propagated_component.pop(PARAMETERS_STR, {}) 123 current_parameters = ( 124 {**component_parameters, **current_parameters} 125 if use_parent_parameters 126 else {**current_parameters, **component_parameters} 127 ) 128 129 # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters 130 # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could 131 # be json_schema are not objects but we believe this is not likely in our case because: 132 # * records are Mapping so objects hence SchemaLoader root should be an object 133 # * connection_specification is a Mapping 134 if self._is_json_schema_object(propagated_component): 135 return propagated_component 136 137 # For objects that don't have type check if their object fields have nested components which should have `$parameters` in it. 138 # For example, QueryProperties in requester.request_parameters, etc. 139 # Update propagated_component value with nested components with parent `$parameters` if needed and return propagated_component. 140 if "type" not in propagated_component: 141 if self._has_nested_components(propagated_component): 142 propagated_component = self._process_nested_components( 143 propagated_component, 144 parent_field_identifier, 145 current_parameters, 146 use_parent_parameters, 147 ) 148 return propagated_component 149 150 # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if 151 # both exist 152 for parameter_key, parameter_value in current_parameters.items(): 153 propagated_component[parameter_key] = ( 154 propagated_component.get(parameter_key) or parameter_value 155 ) 156 157 for field_name, field_value in propagated_component.items(): 158 if isinstance(field_value, dict): 159 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 160 excluded_parameter = current_parameters.pop(field_name, None) 161 parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}" 162 propagated_component[field_name] = self.propagate_types_and_parameters( 163 parent_type_field_identifier, 164 field_value, 165 current_parameters, 166 use_parent_parameters=use_parent_parameters, 167 ) 168 if excluded_parameter: 169 current_parameters[field_name] = excluded_parameter 170 elif isinstance(field_value, typing.List): 171 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 172 excluded_parameter = current_parameters.pop(field_name, None) 173 for i, element in enumerate(field_value): 174 if isinstance(element, dict): 175 parent_type_field_identifier = ( 176 f"{propagated_component.get('type')}.{field_name}" 177 ) 178 field_value[i] = self.propagate_types_and_parameters( 179 parent_type_field_identifier, 180 element, 181 current_parameters, 182 use_parent_parameters=use_parent_parameters, 183 ) 184 if excluded_parameter: 185 current_parameters[field_name] = excluded_parameter 186 187 if current_parameters: 188 propagated_component[PARAMETERS_STR] = current_parameters 189 return propagated_component
Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the default component type if it was not already present. The resulting transformed components are a deep copy of the input components, not an in-place transformation.
Parameters
- declarative_component: The current component that is having type and parameters added
- parent_field_identifier: The name of the field of the current component coming from the parent component
- parent_parameters: The parameters set on parent components defined before the current component
- use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same
Returns
A deep copy of the transformed component with types and parameters persisted to it