airbyte_cdk.sources.declarative.parsers.manifest_component_transformer
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import copy 6import typing 7from typing import Any, Mapping, Optional 8 9PARAMETERS_STR = "$parameters" 10 11 12DEFAULT_MODEL_TYPES: Mapping[str, str] = { 13 # CompositeErrorHandler 14 "CompositeErrorHandler.error_handlers": "DefaultErrorHandler", 15 # CursorPagination 16 "CursorPagination.decoder": "JsonDecoder", 17 # DatetimeBasedCursor 18 "DatetimeBasedCursor.end_datetime": "MinMaxDatetime", 19 "DatetimeBasedCursor.end_time_option": "RequestOption", 20 "DatetimeBasedCursor.start_datetime": "MinMaxDatetime", 21 "DatetimeBasedCursor.start_time_option": "RequestOption", 22 # CustomIncrementalSync 23 "CustomIncrementalSync.end_datetime": "MinMaxDatetime", 24 "CustomIncrementalSync.end_time_option": "RequestOption", 25 "CustomIncrementalSync.start_datetime": "MinMaxDatetime", 26 "CustomIncrementalSync.start_time_option": "RequestOption", 27 # DeclarativeSource 28 "DeclarativeSource.check": "CheckStream", 29 "DeclarativeSource.spec": "Spec", 30 "DeclarativeSource.streams": "DeclarativeStream", 31 # DeclarativeStream 32 "DeclarativeStream.retriever": "SimpleRetriever", 33 "DeclarativeStream.schema_loader": "JsonFileSchemaLoader", 34 # DynamicDeclarativeStream 35 "DynamicDeclarativeStream.stream_template": "DeclarativeStream", 36 "DynamicDeclarativeStream.components_resolver": "ConfigComponentResolver", 37 # HttpComponentsResolver 38 "HttpComponentsResolver.retriever": "SimpleRetriever", 39 "HttpComponentsResolver.components_mapping": "ComponentMappingDefinition", 40 # ConfigComponentResolver 41 "ConfigComponentsResolver.stream_config": "StreamConfig", 42 "ConfigComponentsResolver.components_mapping": "ComponentMappingDefinition", 43 # DefaultErrorHandler 44 "DefaultErrorHandler.response_filters": "HttpResponseFilter", 45 # DefaultPaginator 46 "DefaultPaginator.decoder": "JsonDecoder", 47 "DefaultPaginator.page_size_option": "RequestOption", 48 # DpathExtractor 49 "DpathExtractor.decoder": "JsonDecoder", 50 # HttpRequester 51 "HttpRequester.error_handler": "DefaultErrorHandler", 52 # ListPartitionRouter 53 "ListPartitionRouter.request_option": "RequestOption", 54 # ParentStreamConfig 55 "ParentStreamConfig.request_option": "RequestOption", 56 "ParentStreamConfig.stream": "DeclarativeStream", 57 # RecordSelector 58 "RecordSelector.extractor": "DpathExtractor", 59 "RecordSelector.record_filter": "RecordFilter", 60 # SimpleRetriever 61 "SimpleRetriever.paginator": "NoPagination", 62 "SimpleRetriever.record_selector": "RecordSelector", 63 "SimpleRetriever.requester": "HttpRequester", 64 # SubstreamPartitionRouter 65 "SubstreamPartitionRouter.parent_stream_configs": "ParentStreamConfig", 66 # AddFields 67 "AddFields.fields": "AddedFieldDefinition", 68 # CustomPartitionRouter 69 "CustomPartitionRouter.parent_stream_configs": "ParentStreamConfig", 70 # DynamicSchemaLoader 71 "DynamicSchemaLoader.retriever": "SimpleRetriever", 72 # SchemaTypeIdentifier 73 "SchemaTypeIdentifier.types_map": "TypesMap", 74} 75 76# We retain a separate registry for custom components to automatically insert the type if it is missing. This is intended to 77# be a short term fix because once we have migrated, then type and class_name should be requirements for all custom components. 78CUSTOM_COMPONENTS_MAPPING: Mapping[str, str] = { 79 "CompositeErrorHandler.backoff_strategies": "CustomBackoffStrategy", 80 "DeclarativeStream.retriever": "CustomRetriever", 81 "DeclarativeStream.transformations": "CustomTransformation", 82 "DefaultErrorHandler.backoff_strategies": "CustomBackoffStrategy", 83 "DefaultPaginator.pagination_strategy": "CustomPaginationStrategy", 84 "HttpRequester.authenticator": "CustomAuthenticator", 85 "HttpRequester.error_handler": "CustomErrorHandler", 86 "RecordSelector.extractor": "CustomRecordExtractor", 87 "SimpleRetriever.partition_router": "CustomPartitionRouter", 88} 89 90 91class ManifestComponentTransformer: 92 def propagate_types_and_parameters( 93 self, 94 parent_field_identifier: str, 95 declarative_component: Mapping[str, Any], 96 parent_parameters: Mapping[str, Any], 97 use_parent_parameters: Optional[bool] = None, 98 ) -> Mapping[str, Any]: 99 """ 100 Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the 101 default component type if it was not already present. The resulting transformed components are a deep copy of the input 102 components, not an in-place transformation. 103 104 :param declarative_component: The current component that is having type and parameters added 105 :param parent_field_identifier: The name of the field of the current component coming from the parent component 106 :param parent_parameters: The parameters set on parent components defined before the current component 107 :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same 108 :return: A deep copy of the transformed component with types and parameters persisted to it 109 """ 110 propagated_component = dict(copy.deepcopy(declarative_component)) 111 if "type" not in propagated_component: 112 # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to 113 # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration, 114 # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently 115 # have no low-code connectors that use class_name except for custom components. 116 if "class_name" in propagated_component: 117 found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier) 118 else: 119 found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier) 120 if found_type: 121 propagated_component["type"] = found_type 122 123 # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters 124 # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could 125 # be json_schema are not objects but we believe this is not likely in our case because: 126 # * records are Mapping so objects hence SchemaLoader root should be an object 127 # * connection_specification is a Mapping 128 if "type" not in propagated_component or self._is_json_schema_object(propagated_component): 129 return propagated_component 130 131 # Combines parameters defined at the current level with parameters from parent components. Parameters at the current 132 # level take precedence 133 current_parameters = dict(copy.deepcopy(parent_parameters)) 134 component_parameters = propagated_component.pop(PARAMETERS_STR, {}) 135 current_parameters = ( 136 {**component_parameters, **current_parameters} 137 if use_parent_parameters 138 else {**current_parameters, **component_parameters} 139 ) 140 141 # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if 142 # both exist 143 for parameter_key, parameter_value in current_parameters.items(): 144 propagated_component[parameter_key] = ( 145 propagated_component.get(parameter_key) or parameter_value 146 ) 147 148 for field_name, field_value in propagated_component.items(): 149 if isinstance(field_value, dict): 150 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 151 excluded_parameter = current_parameters.pop(field_name, None) 152 parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}" 153 propagated_component[field_name] = self.propagate_types_and_parameters( 154 parent_type_field_identifier, 155 field_value, 156 current_parameters, 157 use_parent_parameters=use_parent_parameters, 158 ) 159 if excluded_parameter: 160 current_parameters[field_name] = excluded_parameter 161 elif isinstance(field_value, typing.List): 162 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 163 excluded_parameter = current_parameters.pop(field_name, None) 164 for i, element in enumerate(field_value): 165 if isinstance(element, dict): 166 parent_type_field_identifier = ( 167 f"{propagated_component.get('type')}.{field_name}" 168 ) 169 field_value[i] = self.propagate_types_and_parameters( 170 parent_type_field_identifier, 171 element, 172 current_parameters, 173 use_parent_parameters=use_parent_parameters, 174 ) 175 if excluded_parameter: 176 current_parameters[field_name] = excluded_parameter 177 178 if current_parameters: 179 propagated_component[PARAMETERS_STR] = current_parameters 180 return propagated_component 181 182 @staticmethod 183 def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool: 184 return propagated_component.get("type") == "object"
PARAMETERS_STR =
'$parameters'
DEFAULT_MODEL_TYPES: Mapping[str, str] =
{'CompositeErrorHandler.error_handlers': 'DefaultErrorHandler', 'CursorPagination.decoder': 'JsonDecoder', 'DatetimeBasedCursor.end_datetime': 'MinMaxDatetime', 'DatetimeBasedCursor.end_time_option': 'RequestOption', 'DatetimeBasedCursor.start_datetime': 'MinMaxDatetime', 'DatetimeBasedCursor.start_time_option': 'RequestOption', 'CustomIncrementalSync.end_datetime': 'MinMaxDatetime', 'CustomIncrementalSync.end_time_option': 'RequestOption', 'CustomIncrementalSync.start_datetime': 'MinMaxDatetime', 'CustomIncrementalSync.start_time_option': 'RequestOption', 'DeclarativeSource.check': 'CheckStream', 'DeclarativeSource.spec': 'Spec', 'DeclarativeSource.streams': 'DeclarativeStream', 'DeclarativeStream.retriever': 'SimpleRetriever', 'DeclarativeStream.schema_loader': 'JsonFileSchemaLoader', 'DynamicDeclarativeStream.stream_template': 'DeclarativeStream', 'DynamicDeclarativeStream.components_resolver': 'ConfigComponentResolver', 'HttpComponentsResolver.retriever': 'SimpleRetriever', 'HttpComponentsResolver.components_mapping': 'ComponentMappingDefinition', 'ConfigComponentsResolver.stream_config': 'StreamConfig', 'ConfigComponentsResolver.components_mapping': 'ComponentMappingDefinition', 'DefaultErrorHandler.response_filters': 'HttpResponseFilter', 'DefaultPaginator.decoder': 'JsonDecoder', 'DefaultPaginator.page_size_option': 'RequestOption', 'DpathExtractor.decoder': 'JsonDecoder', 'HttpRequester.error_handler': 'DefaultErrorHandler', 'ListPartitionRouter.request_option': 'RequestOption', 'ParentStreamConfig.request_option': 'RequestOption', 'ParentStreamConfig.stream': 'DeclarativeStream', 'RecordSelector.extractor': 'DpathExtractor', 'RecordSelector.record_filter': 'RecordFilter', 'SimpleRetriever.paginator': 'NoPagination', 'SimpleRetriever.record_selector': 'RecordSelector', 'SimpleRetriever.requester': 'HttpRequester', 'SubstreamPartitionRouter.parent_stream_configs': 'ParentStreamConfig', 'AddFields.fields': 'AddedFieldDefinition', 'CustomPartitionRouter.parent_stream_configs': 'ParentStreamConfig', 'DynamicSchemaLoader.retriever': 'SimpleRetriever', 'SchemaTypeIdentifier.types_map': 'TypesMap'}
CUSTOM_COMPONENTS_MAPPING: Mapping[str, str] =
{'CompositeErrorHandler.backoff_strategies': 'CustomBackoffStrategy', 'DeclarativeStream.retriever': 'CustomRetriever', 'DeclarativeStream.transformations': 'CustomTransformation', 'DefaultErrorHandler.backoff_strategies': 'CustomBackoffStrategy', 'DefaultPaginator.pagination_strategy': 'CustomPaginationStrategy', 'HttpRequester.authenticator': 'CustomAuthenticator', 'HttpRequester.error_handler': 'CustomErrorHandler', 'RecordSelector.extractor': 'CustomRecordExtractor', 'SimpleRetriever.partition_router': 'CustomPartitionRouter'}
class
ManifestComponentTransformer:
92class ManifestComponentTransformer: 93 def propagate_types_and_parameters( 94 self, 95 parent_field_identifier: str, 96 declarative_component: Mapping[str, Any], 97 parent_parameters: Mapping[str, Any], 98 use_parent_parameters: Optional[bool] = None, 99 ) -> Mapping[str, Any]: 100 """ 101 Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the 102 default component type if it was not already present. The resulting transformed components are a deep copy of the input 103 components, not an in-place transformation. 104 105 :param declarative_component: The current component that is having type and parameters added 106 :param parent_field_identifier: The name of the field of the current component coming from the parent component 107 :param parent_parameters: The parameters set on parent components defined before the current component 108 :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same 109 :return: A deep copy of the transformed component with types and parameters persisted to it 110 """ 111 propagated_component = dict(copy.deepcopy(declarative_component)) 112 if "type" not in propagated_component: 113 # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to 114 # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration, 115 # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently 116 # have no low-code connectors that use class_name except for custom components. 117 if "class_name" in propagated_component: 118 found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier) 119 else: 120 found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier) 121 if found_type: 122 propagated_component["type"] = found_type 123 124 # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters 125 # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could 126 # be json_schema are not objects but we believe this is not likely in our case because: 127 # * records are Mapping so objects hence SchemaLoader root should be an object 128 # * connection_specification is a Mapping 129 if "type" not in propagated_component or self._is_json_schema_object(propagated_component): 130 return propagated_component 131 132 # Combines parameters defined at the current level with parameters from parent components. Parameters at the current 133 # level take precedence 134 current_parameters = dict(copy.deepcopy(parent_parameters)) 135 component_parameters = propagated_component.pop(PARAMETERS_STR, {}) 136 current_parameters = ( 137 {**component_parameters, **current_parameters} 138 if use_parent_parameters 139 else {**current_parameters, **component_parameters} 140 ) 141 142 # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if 143 # both exist 144 for parameter_key, parameter_value in current_parameters.items(): 145 propagated_component[parameter_key] = ( 146 propagated_component.get(parameter_key) or parameter_value 147 ) 148 149 for field_name, field_value in propagated_component.items(): 150 if isinstance(field_value, dict): 151 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 152 excluded_parameter = current_parameters.pop(field_name, None) 153 parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}" 154 propagated_component[field_name] = self.propagate_types_and_parameters( 155 parent_type_field_identifier, 156 field_value, 157 current_parameters, 158 use_parent_parameters=use_parent_parameters, 159 ) 160 if excluded_parameter: 161 current_parameters[field_name] = excluded_parameter 162 elif isinstance(field_value, typing.List): 163 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 164 excluded_parameter = current_parameters.pop(field_name, None) 165 for i, element in enumerate(field_value): 166 if isinstance(element, dict): 167 parent_type_field_identifier = ( 168 f"{propagated_component.get('type')}.{field_name}" 169 ) 170 field_value[i] = self.propagate_types_and_parameters( 171 parent_type_field_identifier, 172 element, 173 current_parameters, 174 use_parent_parameters=use_parent_parameters, 175 ) 176 if excluded_parameter: 177 current_parameters[field_name] = excluded_parameter 178 179 if current_parameters: 180 propagated_component[PARAMETERS_STR] = current_parameters 181 return propagated_component 182 183 @staticmethod 184 def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool: 185 return propagated_component.get("type") == "object"
def
propagate_types_and_parameters( self, parent_field_identifier: str, declarative_component: Mapping[str, Any], parent_parameters: Mapping[str, Any], use_parent_parameters: Optional[bool] = None) -> Mapping[str, Any]:
93 def propagate_types_and_parameters( 94 self, 95 parent_field_identifier: str, 96 declarative_component: Mapping[str, Any], 97 parent_parameters: Mapping[str, Any], 98 use_parent_parameters: Optional[bool] = None, 99 ) -> Mapping[str, Any]: 100 """ 101 Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the 102 default component type if it was not already present. The resulting transformed components are a deep copy of the input 103 components, not an in-place transformation. 104 105 :param declarative_component: The current component that is having type and parameters added 106 :param parent_field_identifier: The name of the field of the current component coming from the parent component 107 :param parent_parameters: The parameters set on parent components defined before the current component 108 :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same 109 :return: A deep copy of the transformed component with types and parameters persisted to it 110 """ 111 propagated_component = dict(copy.deepcopy(declarative_component)) 112 if "type" not in propagated_component: 113 # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to 114 # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration, 115 # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently 116 # have no low-code connectors that use class_name except for custom components. 117 if "class_name" in propagated_component: 118 found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier) 119 else: 120 found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier) 121 if found_type: 122 propagated_component["type"] = found_type 123 124 # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters 125 # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could 126 # be json_schema are not objects but we believe this is not likely in our case because: 127 # * records are Mapping so objects hence SchemaLoader root should be an object 128 # * connection_specification is a Mapping 129 if "type" not in propagated_component or self._is_json_schema_object(propagated_component): 130 return propagated_component 131 132 # Combines parameters defined at the current level with parameters from parent components. Parameters at the current 133 # level take precedence 134 current_parameters = dict(copy.deepcopy(parent_parameters)) 135 component_parameters = propagated_component.pop(PARAMETERS_STR, {}) 136 current_parameters = ( 137 {**component_parameters, **current_parameters} 138 if use_parent_parameters 139 else {**current_parameters, **component_parameters} 140 ) 141 142 # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if 143 # both exist 144 for parameter_key, parameter_value in current_parameters.items(): 145 propagated_component[parameter_key] = ( 146 propagated_component.get(parameter_key) or parameter_value 147 ) 148 149 for field_name, field_value in propagated_component.items(): 150 if isinstance(field_value, dict): 151 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 152 excluded_parameter = current_parameters.pop(field_name, None) 153 parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}" 154 propagated_component[field_name] = self.propagate_types_and_parameters( 155 parent_type_field_identifier, 156 field_value, 157 current_parameters, 158 use_parent_parameters=use_parent_parameters, 159 ) 160 if excluded_parameter: 161 current_parameters[field_name] = excluded_parameter 162 elif isinstance(field_value, typing.List): 163 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 164 excluded_parameter = current_parameters.pop(field_name, None) 165 for i, element in enumerate(field_value): 166 if isinstance(element, dict): 167 parent_type_field_identifier = ( 168 f"{propagated_component.get('type')}.{field_name}" 169 ) 170 field_value[i] = self.propagate_types_and_parameters( 171 parent_type_field_identifier, 172 element, 173 current_parameters, 174 use_parent_parameters=use_parent_parameters, 175 ) 176 if excluded_parameter: 177 current_parameters[field_name] = excluded_parameter 178 179 if current_parameters: 180 propagated_component[PARAMETERS_STR] = current_parameters 181 return propagated_component
Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the default component type if it was not already present. The resulting transformed components are a deep copy of the input components, not an in-place transformation.
Parameters
- declarative_component: The current component that is having type and parameters added
- parent_field_identifier: The name of the field of the current component coming from the parent component
- parent_parameters: The parameters set on parent components defined before the current component
- use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same
Returns
A deep copy of the transformed component with types and parameters persisted to it