airbyte_cdk.sources.declarative.parsers.manifest_component_transformer
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import copy 6import typing 7from typing import Any, Mapping 8 9PARAMETERS_STR = "$parameters" 10 11 12DEFAULT_MODEL_TYPES: Mapping[str, str] = { 13 # CompositeErrorHandler 14 "CompositeErrorHandler.error_handlers": "DefaultErrorHandler", 15 # CursorPagination 16 "CursorPagination.decoder": "JsonDecoder", 17 # DatetimeBasedCursor 18 "DatetimeBasedCursor.end_datetime": "MinMaxDatetime", 19 "DatetimeBasedCursor.end_time_option": "RequestOption", 20 "DatetimeBasedCursor.start_datetime": "MinMaxDatetime", 21 "DatetimeBasedCursor.start_time_option": "RequestOption", 22 # CustomIncrementalSync 23 "CustomIncrementalSync.end_datetime": "MinMaxDatetime", 24 "CustomIncrementalSync.end_time_option": "RequestOption", 25 "CustomIncrementalSync.start_datetime": "MinMaxDatetime", 26 "CustomIncrementalSync.start_time_option": "RequestOption", 27 # DeclarativeSource 28 "DeclarativeSource.check": "CheckStream", 29 "DeclarativeSource.spec": "Spec", 30 "DeclarativeSource.streams": "DeclarativeStream", 31 # DeclarativeStream 32 "DeclarativeStream.retriever": "SimpleRetriever", 33 "DeclarativeStream.schema_loader": "JsonFileSchemaLoader", 34 # DynamicDeclarativeStream 35 "DynamicDeclarativeStream.stream_template": "DeclarativeStream", 36 "DynamicDeclarativeStream.components_resolver": "ConfigComponentResolver", 37 # HttpComponentsResolver 38 "HttpComponentsResolver.retriever": "SimpleRetriever", 39 "HttpComponentsResolver.components_mapping": "ComponentMappingDefinition", 40 # ConfigComponentResolver 41 "ConfigComponentsResolver.stream_config": "StreamConfig", 42 "ConfigComponentsResolver.components_mapping": "ComponentMappingDefinition", 43 # DefaultErrorHandler 44 "DefaultErrorHandler.response_filters": "HttpResponseFilter", 45 # DefaultPaginator 46 "DefaultPaginator.decoder": "JsonDecoder", 47 "DefaultPaginator.page_size_option": "RequestOption", 48 # DpathExtractor 49 "DpathExtractor.decoder": "JsonDecoder", 50 # HttpRequester 51 "HttpRequester.error_handler": "DefaultErrorHandler", 52 # ListPartitionRouter 53 "ListPartitionRouter.request_option": "RequestOption", 54 # ParentStreamConfig 55 "ParentStreamConfig.request_option": "RequestOption", 56 "ParentStreamConfig.stream": "DeclarativeStream", 57 # RecordSelector 58 "RecordSelector.extractor": "DpathExtractor", 59 "RecordSelector.record_filter": "RecordFilter", 60 # SimpleRetriever 61 "SimpleRetriever.paginator": "NoPagination", 62 "SimpleRetriever.record_selector": "RecordSelector", 63 "SimpleRetriever.requester": "HttpRequester", 64 # SubstreamPartitionRouter 65 "SubstreamPartitionRouter.parent_stream_configs": "ParentStreamConfig", 66 # AddFields 67 "AddFields.fields": "AddedFieldDefinition", 68 # CustomPartitionRouter 69 "CustomPartitionRouter.parent_stream_configs": "ParentStreamConfig", 70 # DynamicSchemaLoader 71 "DynamicSchemaLoader.retriever": "SimpleRetriever", 72 # SchemaTypeIdentifier 73 "SchemaTypeIdentifier.types_map": "TypesMap", 74} 75 76# We retain a separate registry for custom components to automatically insert the type if it is missing. This is intended to 77# be a short term fix because once we have migrated, then type and class_name should be requirements for all custom components. 78CUSTOM_COMPONENTS_MAPPING: Mapping[str, str] = { 79 "CompositeErrorHandler.backoff_strategies": "CustomBackoffStrategy", 80 "DeclarativeStream.retriever": "CustomRetriever", 81 "DeclarativeStream.transformations": "CustomTransformation", 82 "DefaultErrorHandler.backoff_strategies": "CustomBackoffStrategy", 83 "DefaultPaginator.pagination_strategy": "CustomPaginationStrategy", 84 "HttpRequester.authenticator": "CustomAuthenticator", 85 "HttpRequester.error_handler": "CustomErrorHandler", 86 "RecordSelector.extractor": "CustomRecordExtractor", 87 "SimpleRetriever.partition_router": "CustomPartitionRouter", 88} 89 90 91class ManifestComponentTransformer: 92 def propagate_types_and_parameters( 93 self, 94 parent_field_identifier: str, 95 declarative_component: Mapping[str, Any], 96 parent_parameters: Mapping[str, Any], 97 ) -> Mapping[str, Any]: 98 """ 99 Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the 100 default component type if it was not already present. The resulting transformed components are a deep copy of the input 101 components, not an in-place transformation. 102 103 :param declarative_component: The current component that is having type and parameters added 104 :param parent_field_identifier: The name of the field of the current component coming from the parent component 105 :param parent_parameters: The parameters set on parent components defined before the current component 106 :return: A deep copy of the transformed component with types and parameters persisted to it 107 """ 108 propagated_component = dict(copy.deepcopy(declarative_component)) 109 if "type" not in propagated_component: 110 # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to 111 # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration, 112 # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently 113 # have no low-code connectors that use class_name except for custom components. 114 if "class_name" in propagated_component: 115 found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier) 116 else: 117 found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier) 118 if found_type: 119 propagated_component["type"] = found_type 120 121 # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters 122 # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could 123 # be json_schema are not objects but we believe this is not likely in our case because: 124 # * records are Mapping so objects hence SchemaLoader root should be an object 125 # * connection_specification is a Mapping 126 if "type" not in propagated_component or self._is_json_schema_object(propagated_component): 127 return propagated_component 128 129 # Combines parameters defined at the current level with parameters from parent components. Parameters at the current 130 # level take precedence 131 current_parameters = dict(copy.deepcopy(parent_parameters)) 132 component_parameters = propagated_component.pop(PARAMETERS_STR, {}) 133 current_parameters = {**current_parameters, **component_parameters} 134 135 # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if 136 # both exist 137 for parameter_key, parameter_value in current_parameters.items(): 138 propagated_component[parameter_key] = ( 139 propagated_component.get(parameter_key) or parameter_value 140 ) 141 142 for field_name, field_value in propagated_component.items(): 143 if isinstance(field_value, dict): 144 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 145 excluded_parameter = current_parameters.pop(field_name, None) 146 parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}" 147 propagated_component[field_name] = self.propagate_types_and_parameters( 148 parent_type_field_identifier, field_value, current_parameters 149 ) 150 if excluded_parameter: 151 current_parameters[field_name] = excluded_parameter 152 elif isinstance(field_value, typing.List): 153 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 154 excluded_parameter = current_parameters.pop(field_name, None) 155 for i, element in enumerate(field_value): 156 if isinstance(element, dict): 157 parent_type_field_identifier = ( 158 f"{propagated_component.get('type')}.{field_name}" 159 ) 160 field_value[i] = self.propagate_types_and_parameters( 161 parent_type_field_identifier, element, current_parameters 162 ) 163 if excluded_parameter: 164 current_parameters[field_name] = excluded_parameter 165 166 if current_parameters: 167 propagated_component[PARAMETERS_STR] = current_parameters 168 return propagated_component 169 170 @staticmethod 171 def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool: 172 return propagated_component.get("type") == "object"
PARAMETERS_STR =
'$parameters'
DEFAULT_MODEL_TYPES: Mapping[str, str] =
{'CompositeErrorHandler.error_handlers': 'DefaultErrorHandler', 'CursorPagination.decoder': 'JsonDecoder', 'DatetimeBasedCursor.end_datetime': 'MinMaxDatetime', 'DatetimeBasedCursor.end_time_option': 'RequestOption', 'DatetimeBasedCursor.start_datetime': 'MinMaxDatetime', 'DatetimeBasedCursor.start_time_option': 'RequestOption', 'CustomIncrementalSync.end_datetime': 'MinMaxDatetime', 'CustomIncrementalSync.end_time_option': 'RequestOption', 'CustomIncrementalSync.start_datetime': 'MinMaxDatetime', 'CustomIncrementalSync.start_time_option': 'RequestOption', 'DeclarativeSource.check': 'CheckStream', 'DeclarativeSource.spec': 'Spec', 'DeclarativeSource.streams': 'DeclarativeStream', 'DeclarativeStream.retriever': 'SimpleRetriever', 'DeclarativeStream.schema_loader': 'JsonFileSchemaLoader', 'DynamicDeclarativeStream.stream_template': 'DeclarativeStream', 'DynamicDeclarativeStream.components_resolver': 'ConfigComponentResolver', 'HttpComponentsResolver.retriever': 'SimpleRetriever', 'HttpComponentsResolver.components_mapping': 'ComponentMappingDefinition', 'ConfigComponentsResolver.stream_config': 'StreamConfig', 'ConfigComponentsResolver.components_mapping': 'ComponentMappingDefinition', 'DefaultErrorHandler.response_filters': 'HttpResponseFilter', 'DefaultPaginator.decoder': 'JsonDecoder', 'DefaultPaginator.page_size_option': 'RequestOption', 'DpathExtractor.decoder': 'JsonDecoder', 'HttpRequester.error_handler': 'DefaultErrorHandler', 'ListPartitionRouter.request_option': 'RequestOption', 'ParentStreamConfig.request_option': 'RequestOption', 'ParentStreamConfig.stream': 'DeclarativeStream', 'RecordSelector.extractor': 'DpathExtractor', 'RecordSelector.record_filter': 'RecordFilter', 'SimpleRetriever.paginator': 'NoPagination', 'SimpleRetriever.record_selector': 'RecordSelector', 'SimpleRetriever.requester': 'HttpRequester', 'SubstreamPartitionRouter.parent_stream_configs': 'ParentStreamConfig', 'AddFields.fields': 'AddedFieldDefinition', 'CustomPartitionRouter.parent_stream_configs': 'ParentStreamConfig', 'DynamicSchemaLoader.retriever': 'SimpleRetriever', 'SchemaTypeIdentifier.types_map': 'TypesMap'}
CUSTOM_COMPONENTS_MAPPING: Mapping[str, str] =
{'CompositeErrorHandler.backoff_strategies': 'CustomBackoffStrategy', 'DeclarativeStream.retriever': 'CustomRetriever', 'DeclarativeStream.transformations': 'CustomTransformation', 'DefaultErrorHandler.backoff_strategies': 'CustomBackoffStrategy', 'DefaultPaginator.pagination_strategy': 'CustomPaginationStrategy', 'HttpRequester.authenticator': 'CustomAuthenticator', 'HttpRequester.error_handler': 'CustomErrorHandler', 'RecordSelector.extractor': 'CustomRecordExtractor', 'SimpleRetriever.partition_router': 'CustomPartitionRouter'}
class
ManifestComponentTransformer:
92class ManifestComponentTransformer: 93 def propagate_types_and_parameters( 94 self, 95 parent_field_identifier: str, 96 declarative_component: Mapping[str, Any], 97 parent_parameters: Mapping[str, Any], 98 ) -> Mapping[str, Any]: 99 """ 100 Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the 101 default component type if it was not already present. The resulting transformed components are a deep copy of the input 102 components, not an in-place transformation. 103 104 :param declarative_component: The current component that is having type and parameters added 105 :param parent_field_identifier: The name of the field of the current component coming from the parent component 106 :param parent_parameters: The parameters set on parent components defined before the current component 107 :return: A deep copy of the transformed component with types and parameters persisted to it 108 """ 109 propagated_component = dict(copy.deepcopy(declarative_component)) 110 if "type" not in propagated_component: 111 # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to 112 # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration, 113 # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently 114 # have no low-code connectors that use class_name except for custom components. 115 if "class_name" in propagated_component: 116 found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier) 117 else: 118 found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier) 119 if found_type: 120 propagated_component["type"] = found_type 121 122 # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters 123 # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could 124 # be json_schema are not objects but we believe this is not likely in our case because: 125 # * records are Mapping so objects hence SchemaLoader root should be an object 126 # * connection_specification is a Mapping 127 if "type" not in propagated_component or self._is_json_schema_object(propagated_component): 128 return propagated_component 129 130 # Combines parameters defined at the current level with parameters from parent components. Parameters at the current 131 # level take precedence 132 current_parameters = dict(copy.deepcopy(parent_parameters)) 133 component_parameters = propagated_component.pop(PARAMETERS_STR, {}) 134 current_parameters = {**current_parameters, **component_parameters} 135 136 # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if 137 # both exist 138 for parameter_key, parameter_value in current_parameters.items(): 139 propagated_component[parameter_key] = ( 140 propagated_component.get(parameter_key) or parameter_value 141 ) 142 143 for field_name, field_value in propagated_component.items(): 144 if isinstance(field_value, dict): 145 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 146 excluded_parameter = current_parameters.pop(field_name, None) 147 parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}" 148 propagated_component[field_name] = self.propagate_types_and_parameters( 149 parent_type_field_identifier, field_value, current_parameters 150 ) 151 if excluded_parameter: 152 current_parameters[field_name] = excluded_parameter 153 elif isinstance(field_value, typing.List): 154 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 155 excluded_parameter = current_parameters.pop(field_name, None) 156 for i, element in enumerate(field_value): 157 if isinstance(element, dict): 158 parent_type_field_identifier = ( 159 f"{propagated_component.get('type')}.{field_name}" 160 ) 161 field_value[i] = self.propagate_types_and_parameters( 162 parent_type_field_identifier, element, current_parameters 163 ) 164 if excluded_parameter: 165 current_parameters[field_name] = excluded_parameter 166 167 if current_parameters: 168 propagated_component[PARAMETERS_STR] = current_parameters 169 return propagated_component 170 171 @staticmethod 172 def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool: 173 return propagated_component.get("type") == "object"
def
propagate_types_and_parameters( self, parent_field_identifier: str, declarative_component: Mapping[str, Any], parent_parameters: Mapping[str, Any]) -> Mapping[str, Any]:
93 def propagate_types_and_parameters( 94 self, 95 parent_field_identifier: str, 96 declarative_component: Mapping[str, Any], 97 parent_parameters: Mapping[str, Any], 98 ) -> Mapping[str, Any]: 99 """ 100 Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the 101 default component type if it was not already present. The resulting transformed components are a deep copy of the input 102 components, not an in-place transformation. 103 104 :param declarative_component: The current component that is having type and parameters added 105 :param parent_field_identifier: The name of the field of the current component coming from the parent component 106 :param parent_parameters: The parameters set on parent components defined before the current component 107 :return: A deep copy of the transformed component with types and parameters persisted to it 108 """ 109 propagated_component = dict(copy.deepcopy(declarative_component)) 110 if "type" not in propagated_component: 111 # If the component has class_name we assume that this is a reference to a custom component. This is a slight change to 112 # existing behavior because we originally allowed for either class or type to be specified. After the pydantic migration, 113 # class_name will only be a valid field on custom components and this change reflects that. I checked, and we currently 114 # have no low-code connectors that use class_name except for custom components. 115 if "class_name" in propagated_component: 116 found_type = CUSTOM_COMPONENTS_MAPPING.get(parent_field_identifier) 117 else: 118 found_type = DEFAULT_MODEL_TYPES.get(parent_field_identifier) 119 if found_type: 120 propagated_component["type"] = found_type 121 122 # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters 123 # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could 124 # be json_schema are not objects but we believe this is not likely in our case because: 125 # * records are Mapping so objects hence SchemaLoader root should be an object 126 # * connection_specification is a Mapping 127 if "type" not in propagated_component or self._is_json_schema_object(propagated_component): 128 return propagated_component 129 130 # Combines parameters defined at the current level with parameters from parent components. Parameters at the current 131 # level take precedence 132 current_parameters = dict(copy.deepcopy(parent_parameters)) 133 component_parameters = propagated_component.pop(PARAMETERS_STR, {}) 134 current_parameters = {**current_parameters, **component_parameters} 135 136 # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if 137 # both exist 138 for parameter_key, parameter_value in current_parameters.items(): 139 propagated_component[parameter_key] = ( 140 propagated_component.get(parameter_key) or parameter_value 141 ) 142 143 for field_name, field_value in propagated_component.items(): 144 if isinstance(field_value, dict): 145 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 146 excluded_parameter = current_parameters.pop(field_name, None) 147 parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}" 148 propagated_component[field_name] = self.propagate_types_and_parameters( 149 parent_type_field_identifier, field_value, current_parameters 150 ) 151 if excluded_parameter: 152 current_parameters[field_name] = excluded_parameter 153 elif isinstance(field_value, typing.List): 154 # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle 155 excluded_parameter = current_parameters.pop(field_name, None) 156 for i, element in enumerate(field_value): 157 if isinstance(element, dict): 158 parent_type_field_identifier = ( 159 f"{propagated_component.get('type')}.{field_name}" 160 ) 161 field_value[i] = self.propagate_types_and_parameters( 162 parent_type_field_identifier, element, current_parameters 163 ) 164 if excluded_parameter: 165 current_parameters[field_name] = excluded_parameter 166 167 if current_parameters: 168 propagated_component[PARAMETERS_STR] = current_parameters 169 return propagated_component
Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the default component type if it was not already present. The resulting transformed components are a deep copy of the input components, not an in-place transformation.
Parameters
- declarative_component: The current component that is having type and parameters added
- parent_field_identifier: The name of the field of the current component coming from the parent component
- parent_parameters: The parameters set on parent components defined before the current component
Returns
A deep copy of the transformed component with types and parameters persisted to it