airbyte_cdk.sources.declarative.parsers.manifest_component_transformer
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import copy 6import typing 7from typing import Any, Dict, Mapping, Optional 8 9PARAMETERS_STR = "$parameters" 10 11 12DEFAULT_MODEL_TYPES: Mapping[str, str] = { 13 # CompositeErrorHandler 14 "CompositeErrorHandler.error_handlers": "DefaultErrorHandler", 15 # CursorPagination 16 "CursorPagination.decoder": "JsonDecoder", 17 # DatetimeBasedCursor 18 "DatetimeBasedCursor.end_datetime": "MinMaxDatetime", 19 "DatetimeBasedCursor.end_time_option": "RequestOption", 20 "DatetimeBasedCursor.start_datetime": "MinMaxDatetime", 21 "DatetimeBasedCursor.start_time_option": "RequestOption", 22 # CustomIncrementalSync 23 "CustomIncrementalSync.end_datetime": "MinMaxDatetime", 24 "CustomIncrementalSync.end_time_option": "RequestOption", 25 "CustomIncrementalSync.start_datetime": "MinMaxDatetime", 26 "CustomIncrementalSync.start_time_option": "RequestOption", 27 # DeclarativeSource 28 "DeclarativeSource.check": "CheckStream", 29 "DeclarativeSource.spec": "Spec", 30 "DeclarativeSource.streams": "DeclarativeStream", 31 # DeclarativeStream 32 "DeclarativeStream.retriever": "SimpleRetriever", 33 "DeclarativeStream.schema_loader": "JsonFileSchemaLoader", 34 # DynamicDeclarativeStream 35 "DynamicDeclarativeStream.stream_template": "DeclarativeStream", 36 "DynamicDeclarativeStream.components_resolver": "ConfigComponentResolver", 37 # HttpComponentsResolver 38 "HttpComponentsResolver.retriever": "SimpleRetriever", 39 "HttpComponentsResolver.components_mapping": "ComponentMappingDefinition", 40 # ConfigComponentResolver 41 "ConfigComponentsResolver.stream_config": "StreamConfig", 42 "ConfigComponentsResolver.components_mapping": "ComponentMappingDefinition", 43 # DefaultErrorHandler 44 "DefaultErrorHandler.response_filters": "HttpResponseFilter", 45 # DefaultPaginator 46 "DefaultPaginator.decoder": "JsonDecoder", 47 "DefaultPaginator.page_size_option": "RequestOption", 48 # DpathExtractor 49 "DpathExtractor.decoder": "JsonDecoder", 50 # HttpRequester 51 "HttpRequester.error_handler": "DefaultErrorHandler", 52 # ListPartitionRouter 53 "ListPartitionRouter.request_option": "RequestOption", 54 # ParentStreamConfig 55 "ParentStreamConfig.request_option": "RequestOption", 56 "ParentStreamConfig.stream": "DeclarativeStream", 57 # RecordSelector 58 "RecordSelector.extractor": "DpathExtractor", 59 "RecordSelector.record_filter": "RecordFilter", 60 # SimpleRetriever 61 "SimpleRetriever.paginator": "NoPagination", 62 "SimpleRetriever.record_selector": "RecordSelector", 63 "SimpleRetriever.requester": "HttpRequester", 64 # SubstreamPartitionRouter 65 "SubstreamPartitionRouter.parent_stream_configs": "ParentStreamConfig", 66 # AddFields 67 "AddFields.fields": "AddedFieldDefinition", 68 # CustomPartitionRouter 69 "CustomPartitionRouter.parent_stream_configs": "ParentStreamConfig", 70 # DynamicSchemaLoader 71 "DynamicSchemaLoader.retriever": "SimpleRetriever", 72 # SchemaTypeIdentifier 73 "SchemaTypeIdentifier.types_map": "TypesMap", 74} 75 76# We retain a separate registry for custom components to automatically insert the type if it is missing. This is intended to 77# be a short term fix because once we have migrated, then type and class_name should be requirements for all custom components. 78CUSTOM_COMPONENTS_MAPPING: Mapping[str, str] = { 79 "CompositeErrorHandler.backoff_strategies": "CustomBackoffStrategy", 80 "DeclarativeStream.retriever": "CustomRetriever", 81 "DeclarativeStream.transformations": "CustomTransformation", 82 "DefaultErrorHandler.backoff_strategies": "CustomBackoffStrategy", 83 "DefaultPaginator.pagination_strategy": "CustomPaginationStrategy", 84 "HttpRequester.authenticator": "CustomAuthenticator", 85 "HttpRequester.error_handler": "CustomErrorHandler", 86 "RecordSelector.extractor": "CustomRecordExtractor", 87 "SimpleRetriever.partition_router": "CustomPartitionRouter", 88} 89 90 91class ManifestComponentTransformer: 92 def propagate_types_and_parameters( 93 self, 94 parent_field_identifier: str, 95 declarative_component: Mapping[str, Any], 96 parent_parameters: Mapping[str, Any], 97 use_parent_parameters: Optional[bool] = None, 98 ) -> Dict[str, Any]: 99 """ 100 Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the 101 default component type if it was not already present. The resulting transformed components are a deep copy of the input 102 components, not an in-place transformation. 103 104 :param declarative_component: The current component that is having type and parameters added 105 :param parent_field_identifier: The name of the field of the current component coming from the parent component 106 :param parent_parameters: The parameters set on parent components defined before the current component 107 :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same 108 :return: A deep copy of the transformed component with types and parameters persisted to it 109 """ 110 propagated_component = dict(copy.deepcopy(declarative_component)) 111 if "type" not in propagated_component: 112 # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to 113 # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration, 114 # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently 115 # have no low-code connectors that use class_name except for custom components. 116 if "class_name" in propagated_component: 117 found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier) 118 else: 119 found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier) 120 if found_type: 121 propagated_component["type"] = found_type 122 123 # Combines parameters defined at the current level with parameters from parent components. Parameters at the current 124 # level take precedence 125 current_parameters = dict(copy.deepcopy(parent_parameters)) 126 component_parameters = propagated_component.pop(PARAMETERS_STR, {}) 127 current_parameters = ( 128 {**component_parameters, **current_parameters} 129 if use_parent_parameters 130 else {**current_parameters, **component_parameters} 131 ) 132 133 # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters 134 # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could 135 # be json_schema are not objects but we believe this is not likely in our case because: 136 # * records are Mapping so objects hence SchemaLoader root should be an object 137 # * connection_specification is a Mapping 138 if self._is_json_schema_object(propagated_component): 139 return propagated_component 140 141 # For objects that don't have type check if their object fields have nested components which should have `$parameters` in it. 142 # For example, QueryProperties in requester.request_parameters, etc. 143 # Update propagated_component value with nested components with parent `$parameters` if needed and return propagated_component. 144 if "type" not in propagated_component: 145 if self._has_nested_components(propagated_component): 146 propagated_component = self._process_nested_components( 147 propagated_component, 148 parent_field_identifier, 149 current_parameters, 150 use_parent_parameters, 151 ) 152 return propagated_component 153 154 # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if 155 # both exist 156 for parameter_key, parameter_value in current_parameters.items(): 157 propagated_component[parameter_key] = ( 158 propagated_component.get(parameter_key) or parameter_value 159 ) 160 161 for field_name, field_value in propagated_component.items(): 162 if isinstance(field_value, dict): 163 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 164 excluded_parameter = current_parameters.pop(field_name, None) 165 parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}" 166 propagated_component[field_name] = self.propagate_types_and_parameters( 167 parent_type_field_identifier, 168 field_value, 169 current_parameters, 170 use_parent_parameters=use_parent_parameters, 171 ) 172 if excluded_parameter: 173 current_parameters[field_name] = excluded_parameter 174 elif isinstance(field_value, typing.List): 175 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 176 excluded_parameter = current_parameters.pop(field_name, None) 177 for i, element in enumerate(field_value): 178 if isinstance(element, dict): 179 parent_type_field_identifier = ( 180 f"{propagated_component.get('type')}.{field_name}" 181 ) 182 field_value[i] = self.propagate_types_and_parameters( 183 parent_type_field_identifier, 184 element, 185 current_parameters, 186 use_parent_parameters=use_parent_parameters, 187 ) 188 if excluded_parameter: 189 current_parameters[field_name] = excluded_parameter 190 191 if current_parameters: 192 propagated_component[PARAMETERS_STR] = current_parameters 193 return propagated_component 194 195 @staticmethod 196 def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool: 197 return propagated_component.get("type") == "object" or propagated_component.get("type") == [ 198 "null", 199 "object", 200 ] 201 202 @staticmethod 203 def _has_nested_components(propagated_component: Dict[str, Any]) -> bool: 204 for k, v in propagated_component.items(): 205 if isinstance(v, dict) and v.get("type"): 206 return True 207 return False 208 209 def _process_nested_components( 210 self, 211 propagated_component: Dict[str, Any], 212 parent_field_identifier: str, 213 current_parameters: Mapping[str, Any], 214 use_parent_parameters: Optional[bool] = None, 215 ) -> Dict[str, Any]: 216 for field_name, field_value in propagated_component.items(): 217 if isinstance(field_value, dict) and field_value.get("type"): 218 nested_component_with_parameters = self.propagate_types_and_parameters( 219 parent_field_identifier, 220 field_value, 221 current_parameters, 222 use_parent_parameters=use_parent_parameters, 223 ) 224 propagated_component[field_name] = nested_component_with_parameters 225 226 return propagated_component
PARAMETERS_STR =
'$parameters'
DEFAULT_MODEL_TYPES: Mapping[str, str] =
{'CompositeErrorHandler.error_handlers': 'DefaultErrorHandler', 'CursorPagination.decoder': 'JsonDecoder', 'DatetimeBasedCursor.end_datetime': 'MinMaxDatetime', 'DatetimeBasedCursor.end_time_option': 'RequestOption', 'DatetimeBasedCursor.start_datetime': 'MinMaxDatetime', 'DatetimeBasedCursor.start_time_option': 'RequestOption', 'CustomIncrementalSync.end_datetime': 'MinMaxDatetime', 'CustomIncrementalSync.end_time_option': 'RequestOption', 'CustomIncrementalSync.start_datetime': 'MinMaxDatetime', 'CustomIncrementalSync.start_time_option': 'RequestOption', 'DeclarativeSource.check': 'CheckStream', 'DeclarativeSource.spec': 'Spec', 'DeclarativeSource.streams': 'DeclarativeStream', 'DeclarativeStream.retriever': 'SimpleRetriever', 'DeclarativeStream.schema_loader': 'JsonFileSchemaLoader', 'DynamicDeclarativeStream.stream_template': 'DeclarativeStream', 'DynamicDeclarativeStream.components_resolver': 'ConfigComponentResolver', 'HttpComponentsResolver.retriever': 'SimpleRetriever', 'HttpComponentsResolver.components_mapping': 'ComponentMappingDefinition', 'ConfigComponentsResolver.stream_config': 'StreamConfig', 'ConfigComponentsResolver.components_mapping': 'ComponentMappingDefinition', 'DefaultErrorHandler.response_filters': 'HttpResponseFilter', 'DefaultPaginator.decoder': 'JsonDecoder', 'DefaultPaginator.page_size_option': 'RequestOption', 'DpathExtractor.decoder': 'JsonDecoder', 'HttpRequester.error_handler': 'DefaultErrorHandler', 'ListPartitionRouter.request_option': 'RequestOption', 'ParentStreamConfig.request_option': 'RequestOption', 'ParentStreamConfig.stream': 'DeclarativeStream', 'RecordSelector.extractor': 'DpathExtractor', 'RecordSelector.record_filter': 'RecordFilter', 'SimpleRetriever.paginator': 'NoPagination', 'SimpleRetriever.record_selector': 'RecordSelector', 'SimpleRetriever.requester': 'HttpRequester', 'SubstreamPartitionRouter.parent_stream_configs': 'ParentStreamConfig', 'AddFields.fields': 'AddedFieldDefinition', 'CustomPartitionRouter.parent_stream_configs': 'ParentStreamConfig', 'DynamicSchemaLoader.retriever': 'SimpleRetriever', 'SchemaTypeIdentifier.types_map': 'TypesMap'}
CUSTOM_COMPONENTS_MAPPING: Mapping[str, str] =
{'CompositeErrorHandler.backoff_strategies': 'CustomBackoffStrategy', 'DeclarativeStream.retriever': 'CustomRetriever', 'DeclarativeStream.transformations': 'CustomTransformation', 'DefaultErrorHandler.backoff_strategies': 'CustomBackoffStrategy', 'DefaultPaginator.pagination_strategy': 'CustomPaginationStrategy', 'HttpRequester.authenticator': 'CustomAuthenticator', 'HttpRequester.error_handler': 'CustomErrorHandler', 'RecordSelector.extractor': 'CustomRecordExtractor', 'SimpleRetriever.partition_router': 'CustomPartitionRouter'}
class
ManifestComponentTransformer:
92class ManifestComponentTransformer: 93 def propagate_types_and_parameters( 94 self, 95 parent_field_identifier: str, 96 declarative_component: Mapping[str, Any], 97 parent_parameters: Mapping[str, Any], 98 use_parent_parameters: Optional[bool] = None, 99 ) -> Dict[str, Any]: 100 """ 101 Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the 102 default component type if it was not already present. The resulting transformed components are a deep copy of the input 103 components, not an in-place transformation. 104 105 :param declarative_component: The current component that is having type and parameters added 106 :param parent_field_identifier: The name of the field of the current component coming from the parent component 107 :param parent_parameters: The parameters set on parent components defined before the current component 108 :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same 109 :return: A deep copy of the transformed component with types and parameters persisted to it 110 """ 111 propagated_component = dict(copy.deepcopy(declarative_component)) 112 if "type" not in propagated_component: 113 # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to 114 # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration, 115 # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently 116 # have no low-code connectors that use class_name except for custom components. 117 if "class_name" in propagated_component: 118 found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier) 119 else: 120 found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier) 121 if found_type: 122 propagated_component["type"] = found_type 123 124 # Combines parameters defined at the current level with parameters from parent components. Parameters at the current 125 # level take precedence 126 current_parameters = dict(copy.deepcopy(parent_parameters)) 127 component_parameters = propagated_component.pop(PARAMETERS_STR, {}) 128 current_parameters = ( 129 {**component_parameters, **current_parameters} 130 if use_parent_parameters 131 else {**current_parameters, **component_parameters} 132 ) 133 134 # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters 135 # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could 136 # be json_schema are not objects but we believe this is not likely in our case because: 137 # * records are Mapping so objects hence SchemaLoader root should be an object 138 # * connection_specification is a Mapping 139 if self._is_json_schema_object(propagated_component): 140 return propagated_component 141 142 # For objects that don't have type check if their object fields have nested components which should have `$parameters` in it. 143 # For example, QueryProperties in requester.request_parameters, etc. 144 # Update propagated_component value with nested components with parent `$parameters` if needed and return propagated_component. 145 if "type" not in propagated_component: 146 if self._has_nested_components(propagated_component): 147 propagated_component = self._process_nested_components( 148 propagated_component, 149 parent_field_identifier, 150 current_parameters, 151 use_parent_parameters, 152 ) 153 return propagated_component 154 155 # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if 156 # both exist 157 for parameter_key, parameter_value in current_parameters.items(): 158 propagated_component[parameter_key] = ( 159 propagated_component.get(parameter_key) or parameter_value 160 ) 161 162 for field_name, field_value in propagated_component.items(): 163 if isinstance(field_value, dict): 164 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 165 excluded_parameter = current_parameters.pop(field_name, None) 166 parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}" 167 propagated_component[field_name] = self.propagate_types_and_parameters( 168 parent_type_field_identifier, 169 field_value, 170 current_parameters, 171 use_parent_parameters=use_parent_parameters, 172 ) 173 if excluded_parameter: 174 current_parameters[field_name] = excluded_parameter 175 elif isinstance(field_value, typing.List): 176 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 177 excluded_parameter = current_parameters.pop(field_name, None) 178 for i, element in enumerate(field_value): 179 if isinstance(element, dict): 180 parent_type_field_identifier = ( 181 f"{propagated_component.get('type')}.{field_name}" 182 ) 183 field_value[i] = self.propagate_types_and_parameters( 184 parent_type_field_identifier, 185 element, 186 current_parameters, 187 use_parent_parameters=use_parent_parameters, 188 ) 189 if excluded_parameter: 190 current_parameters[field_name] = excluded_parameter 191 192 if current_parameters: 193 propagated_component[PARAMETERS_STR] = current_parameters 194 return propagated_component 195 196 @staticmethod 197 def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool: 198 return propagated_component.get("type") == "object" or propagated_component.get("type") == [ 199 "null", 200 "object", 201 ] 202 203 @staticmethod 204 def _has_nested_components(propagated_component: Dict[str, Any]) -> bool: 205 for k, v in propagated_component.items(): 206 if isinstance(v, dict) and v.get("type"): 207 return True 208 return False 209 210 def _process_nested_components( 211 self, 212 propagated_component: Dict[str, Any], 213 parent_field_identifier: str, 214 current_parameters: Mapping[str, Any], 215 use_parent_parameters: Optional[bool] = None, 216 ) -> Dict[str, Any]: 217 for field_name, field_value in propagated_component.items(): 218 if isinstance(field_value, dict) and field_value.get("type"): 219 nested_component_with_parameters = self.propagate_types_and_parameters( 220 parent_field_identifier, 221 field_value, 222 current_parameters, 223 use_parent_parameters=use_parent_parameters, 224 ) 225 propagated_component[field_name] = nested_component_with_parameters 226 227 return propagated_component
def
propagate_types_and_parameters( self, parent_field_identifier: str, declarative_component: Mapping[str, Any], parent_parameters: Mapping[str, Any], use_parent_parameters: Optional[bool] = None) -> Dict[str, Any]:
93 def propagate_types_and_parameters( 94 self, 95 parent_field_identifier: str, 96 declarative_component: Mapping[str, Any], 97 parent_parameters: Mapping[str, Any], 98 use_parent_parameters: Optional[bool] = None, 99 ) -> Dict[str, Any]: 100 """ 101 Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the 102 default component type if it was not already present. The resulting transformed components are a deep copy of the input 103 components, not an in-place transformation. 104 105 :param declarative_component: The current component that is having type and parameters added 106 :param parent_field_identifier: The name of the field of the current component coming from the parent component 107 :param parent_parameters: The parameters set on parent components defined before the current component 108 :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same 109 :return: A deep copy of the transformed component with types and parameters persisted to it 110 """ 111 propagated_component = dict(copy.deepcopy(declarative_component)) 112 if "type" not in propagated_component: 113 # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to 114 # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration, 115 # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently 116 # have no low-code connectors that use class_name except for custom components. 117 if "class_name" in propagated_component: 118 found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier) 119 else: 120 found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier) 121 if found_type: 122 propagated_component["type"] = found_type 123 124 # Combines parameters defined at the current level with parameters from parent components. Parameters at the current 125 # level take precedence 126 current_parameters = dict(copy.deepcopy(parent_parameters)) 127 component_parameters = propagated_component.pop(PARAMETERS_STR, {}) 128 current_parameters = ( 129 {**component_parameters, **current_parameters} 130 if use_parent_parameters 131 else {**current_parameters, **component_parameters} 132 ) 133 134 # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters 135 # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could 136 # be json_schema are not objects but we believe this is not likely in our case because: 137 # * records are Mapping so objects hence SchemaLoader root should be an object 138 # * connection_specification is a Mapping 139 if self._is_json_schema_object(propagated_component): 140 return propagated_component 141 142 # For objects that don't have type check if their object fields have nested components which should have `$parameters` in it. 143 # For example, QueryProperties in requester.request_parameters, etc. 144 # Update propagated_component value with nested components with parent `$parameters` if needed and return propagated_component. 145 if "type" not in propagated_component: 146 if self._has_nested_components(propagated_component): 147 propagated_component = self._process_nested_components( 148 propagated_component, 149 parent_field_identifier, 150 current_parameters, 151 use_parent_parameters, 152 ) 153 return propagated_component 154 155 # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if 156 # both exist 157 for parameter_key, parameter_value in current_parameters.items(): 158 propagated_component[parameter_key] = ( 159 propagated_component.get(parameter_key) or parameter_value 160 ) 161 162 for field_name, field_value in propagated_component.items(): 163 if isinstance(field_value, dict): 164 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 165 excluded_parameter = current_parameters.pop(field_name, None) 166 parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}" 167 propagated_component[field_name] = self.propagate_types_and_parameters( 168 parent_type_field_identifier, 169 field_value, 170 current_parameters, 171 use_parent_parameters=use_parent_parameters, 172 ) 173 if excluded_parameter: 174 current_parameters[field_name] = excluded_parameter 175 elif isinstance(field_value, typing.List): 176 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 177 excluded_parameter = current_parameters.pop(field_name, None) 178 for i, element in enumerate(field_value): 179 if isinstance(element, dict): 180 parent_type_field_identifier = ( 181 f"{propagated_component.get('type')}.{field_name}" 182 ) 183 field_value[i] = self.propagate_types_and_parameters( 184 parent_type_field_identifier, 185 element, 186 current_parameters, 187 use_parent_parameters=use_parent_parameters, 188 ) 189 if excluded_parameter: 190 current_parameters[field_name] = excluded_parameter 191 192 if current_parameters: 193 propagated_component[PARAMETERS_STR] = current_parameters 194 return propagated_component
Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the default component type if it was not already present. The resulting transformed components are a deep copy of the input components, not an in-place transformation.
Parameters
- declarative_component: The current component that is having type and parameters added
- parent_field_identifier: The name of the field of the current component coming from the parent component
- parent_parameters: The parameters set on parent components defined before the current component
- use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same
Returns
A deep copy of the transformed component with types and parameters persisted to it