airbyte_cdk.sources.declarative.concurrent_declarative_source
1# 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 3# 4 5import logging 6from typing import Any, Generic, Iterator, List, Mapping, MutableMapping, Optional, Tuple 7 8from airbyte_cdk.models import ( 9 AirbyteCatalog, 10 AirbyteMessage, 11 AirbyteStateMessage, 12 ConfiguredAirbyteCatalog, 13) 14from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource 15from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 16from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel 17from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream 18from airbyte_cdk.sources.declarative.extractors import RecordSelector 19from airbyte_cdk.sources.declarative.extractors.record_filter import ( 20 ClientSideIncrementalRecordFilterDecorator, 21) 22from airbyte_cdk.sources.declarative.incremental import ( 23 ConcurrentPerPartitionCursor, 24 GlobalSubstreamCursor, 25) 26from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor 27from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import ( 28 PerPartitionWithGlobalCursor, 29) 30from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource 31from airbyte_cdk.sources.declarative.models import FileUploader 32from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 33 ConcurrencyLevel as ConcurrencyLevelModel, 34) 35from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 36 DatetimeBasedCursor as DatetimeBasedCursorModel, 37) 38from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 39 IncrementingCountCursor as IncrementingCountCursorModel, 40) 41from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( 42 ModelToComponentFactory, 43) 44from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter 45from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever 46from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( 47 DeclarativePartitionFactory, 48 StreamSlicerPartitionGenerator, 49) 50from airbyte_cdk.sources.declarative.types import ConnectionDefinition 51from airbyte_cdk.sources.source import TState 52from airbyte_cdk.sources.streams import Stream 53from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 54from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade 55from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor 56from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream 57from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream 58 59 60class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]): 61 # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock 62 # because it has hit the limit of futures but not partition reader is consuming them. 63 _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 64 65 def __init__( 66 self, 67 catalog: Optional[ConfiguredAirbyteCatalog], 68 config: Optional[Mapping[str, Any]], 69 state: TState, 70 source_config: ConnectionDefinition, 71 debug: bool = False, 72 emit_connector_builder_messages: bool = False, 73 component_factory: Optional[ModelToComponentFactory] = None, 74 config_path: Optional[str] = None, 75 **kwargs: Any, 76 ) -> None: 77 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 78 # no longer needs to store the original incoming state. But maybe there's an edge case? 79 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 80 81 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 82 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 83 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 84 # incremental streams running in full refresh. 85 component_factory = component_factory or ModelToComponentFactory( 86 emit_connector_builder_messages=emit_connector_builder_messages, 87 disable_resumable_full_refresh=True, 88 connector_state_manager=self._connector_state_manager, 89 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 90 ) 91 92 super().__init__( 93 source_config=source_config, 94 config=config, 95 debug=debug, 96 emit_connector_builder_messages=emit_connector_builder_messages, 97 component_factory=component_factory, 98 config_path=config_path, 99 ) 100 101 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 102 if concurrency_level_from_manifest: 103 concurrency_level_component = self._constructor.create_component( 104 model_type=ConcurrencyLevelModel, 105 component_definition=concurrency_level_from_manifest, 106 config=config or {}, 107 ) 108 if not isinstance(concurrency_level_component, ConcurrencyLevel): 109 raise ValueError( 110 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 111 ) 112 113 concurrency_level = concurrency_level_component.get_concurrency_level() 114 initial_number_of_partitions_to_generate = max( 115 concurrency_level // 2, 1 116 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 117 else: 118 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 119 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 120 121 self._concurrent_source = ConcurrentSource.create( 122 num_workers=concurrency_level, 123 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 124 logger=self.logger, 125 slice_logger=self._slice_logger, 126 message_repository=self.message_repository, 127 ) 128 129 # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state. 130 @property 131 def is_partially_declarative(self) -> bool: 132 """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.""" 133 return False 134 135 def read( 136 self, 137 logger: logging.Logger, 138 config: Mapping[str, Any], 139 catalog: ConfiguredAirbyteCatalog, 140 state: Optional[List[AirbyteStateMessage]] = None, 141 ) -> Iterator[AirbyteMessage]: 142 concurrent_streams, _ = self._group_streams(config=config) 143 144 # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of 145 # the concurrent streams must be saved so that they can be removed from the catalog before starting 146 # synchronous streams 147 if len(concurrent_streams) > 0: 148 concurrent_stream_names = set( 149 [concurrent_stream.name for concurrent_stream in concurrent_streams] 150 ) 151 152 selected_concurrent_streams = self._select_streams( 153 streams=concurrent_streams, configured_catalog=catalog 154 ) 155 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 156 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 157 if selected_concurrent_streams: 158 yield from self._concurrent_source.read(selected_concurrent_streams) 159 160 # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the 161 # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many 162 # of which were already synced using the Concurrent CDK 163 filtered_catalog = self._remove_concurrent_streams_from_catalog( 164 catalog=catalog, concurrent_stream_names=concurrent_stream_names 165 ) 166 else: 167 filtered_catalog = catalog 168 169 # It is no need run read for synchronous streams if they are not exists. 170 if not filtered_catalog.streams: 171 return 172 173 yield from super().read(logger, config, filtered_catalog, state) 174 175 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 176 concurrent_streams, synchronous_streams = self._group_streams(config=config) 177 return AirbyteCatalog( 178 streams=[ 179 stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams 180 ] 181 ) 182 183 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 184 """ 185 The `streams` method is used as part of the AbstractSource in the following cases: 186 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 187 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 188 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 189 190 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 191 """ 192 return super().streams(config) 193 194 def _group_streams( 195 self, config: Mapping[str, Any] 196 ) -> Tuple[List[AbstractStream], List[Stream]]: 197 concurrent_streams: List[AbstractStream] = [] 198 synchronous_streams: List[Stream] = [] 199 200 # Combine streams and dynamic_streams. Note: both cannot be empty at the same time, 201 # and this is validated during the initialization of the source. 202 streams = self._stream_configs(self._source_config, config) + self._dynamic_stream_configs( 203 self._source_config, config 204 ) 205 206 name_to_stream_mapping = {stream["name"]: stream for stream in streams} 207 208 for declarative_stream in self.streams(config=config): 209 # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect 210 # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, 211 # so we need to treat them as synchronous 212 213 supports_file_transfer = ( 214 isinstance(declarative_stream, DeclarativeStream) 215 and "file_uploader" in name_to_stream_mapping[declarative_stream.name] 216 ) 217 218 if ( 219 isinstance(declarative_stream, DeclarativeStream) 220 and name_to_stream_mapping[declarative_stream.name]["type"] 221 == "StateDelegatingStream" 222 ): 223 stream_state = self._connector_state_manager.get_stream_state( 224 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 225 ) 226 227 name_to_stream_mapping[declarative_stream.name] = ( 228 name_to_stream_mapping[declarative_stream.name]["incremental_stream"] 229 if stream_state 230 else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"] 231 ) 232 233 if isinstance(declarative_stream, DeclarativeStream) and ( 234 name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 235 == "SimpleRetriever" 236 or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 237 == "AsyncRetriever" 238 ): 239 incremental_sync_component_definition = name_to_stream_mapping[ 240 declarative_stream.name 241 ].get("incremental_sync") 242 243 partition_router_component_definition = ( 244 name_to_stream_mapping[declarative_stream.name] 245 .get("retriever", {}) 246 .get("partition_router") 247 ) 248 is_without_partition_router_or_cursor = not bool( 249 incremental_sync_component_definition 250 ) and not bool(partition_router_component_definition) 251 252 is_substream_without_incremental = ( 253 partition_router_component_definition 254 and not incremental_sync_component_definition 255 ) 256 257 if self._is_concurrent_cursor_incremental_without_partition_routing( 258 declarative_stream, incremental_sync_component_definition 259 ): 260 stream_state = self._connector_state_manager.get_stream_state( 261 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 262 ) 263 stream_state = self._migrate_state(declarative_stream, stream_state) 264 265 retriever = self._get_retriever(declarative_stream, stream_state) 266 267 if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance( 268 declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter 269 ): 270 cursor = declarative_stream.retriever.stream_slicer.stream_slicer 271 272 if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor): 273 # This should never happen since we instantiate ConcurrentCursor in 274 # model_to_component_factory.py 275 raise ValueError( 276 f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}" 277 ) 278 279 partition_generator = StreamSlicerPartitionGenerator( 280 partition_factory=DeclarativePartitionFactory( 281 declarative_stream.name, 282 declarative_stream.get_json_schema(), 283 retriever, 284 self.message_repository, 285 ), 286 stream_slicer=declarative_stream.retriever.stream_slicer, 287 ) 288 else: 289 if ( 290 incremental_sync_component_definition 291 and incremental_sync_component_definition.get("type") 292 == IncrementingCountCursorModel.__name__ 293 ): 294 cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor( 295 model_type=IncrementingCountCursorModel, 296 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 297 stream_name=declarative_stream.name, 298 stream_namespace=declarative_stream.namespace, 299 config=config or {}, 300 ) 301 else: 302 cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( 303 model_type=DatetimeBasedCursorModel, 304 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 305 stream_name=declarative_stream.name, 306 stream_namespace=declarative_stream.namespace, 307 config=config or {}, 308 stream_state_migrations=declarative_stream.state_migrations, 309 ) 310 partition_generator = StreamSlicerPartitionGenerator( 311 partition_factory=DeclarativePartitionFactory( 312 declarative_stream.name, 313 declarative_stream.get_json_schema(), 314 retriever, 315 self.message_repository, 316 ), 317 stream_slicer=cursor, 318 ) 319 320 concurrent_streams.append( 321 DefaultStream( 322 partition_generator=partition_generator, 323 name=declarative_stream.name, 324 json_schema=declarative_stream.get_json_schema(), 325 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 326 cursor_field=cursor.cursor_field.cursor_field_key 327 if hasattr(cursor, "cursor_field") 328 and hasattr( 329 cursor.cursor_field, "cursor_field_key" 330 ) # FIXME this will need to be updated once we do the per partition 331 else None, 332 logger=self.logger, 333 cursor=cursor, 334 supports_file_transfer=supports_file_transfer, 335 ) 336 ) 337 elif ( 338 is_substream_without_incremental or is_without_partition_router_or_cursor 339 ) and hasattr(declarative_stream.retriever, "stream_slicer"): 340 partition_generator = StreamSlicerPartitionGenerator( 341 DeclarativePartitionFactory( 342 declarative_stream.name, 343 declarative_stream.get_json_schema(), 344 declarative_stream.retriever, 345 self.message_repository, 346 ), 347 declarative_stream.retriever.stream_slicer, 348 ) 349 350 final_state_cursor = FinalStateCursor( 351 stream_name=declarative_stream.name, 352 stream_namespace=declarative_stream.namespace, 353 message_repository=self.message_repository, 354 ) 355 356 concurrent_streams.append( 357 DefaultStream( 358 partition_generator=partition_generator, 359 name=declarative_stream.name, 360 json_schema=declarative_stream.get_json_schema(), 361 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 362 cursor_field=None, 363 logger=self.logger, 364 cursor=final_state_cursor, 365 supports_file_transfer=supports_file_transfer, 366 ) 367 ) 368 elif ( 369 incremental_sync_component_definition 370 and incremental_sync_component_definition.get("type", "") 371 == DatetimeBasedCursorModel.__name__ 372 and hasattr(declarative_stream.retriever, "stream_slicer") 373 and isinstance( 374 declarative_stream.retriever.stream_slicer, 375 (GlobalSubstreamCursor, PerPartitionWithGlobalCursor), 376 ) 377 ): 378 stream_state = self._connector_state_manager.get_stream_state( 379 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 380 ) 381 stream_state = self._migrate_state(declarative_stream, stream_state) 382 383 partition_router = declarative_stream.retriever.stream_slicer._partition_router 384 385 perpartition_cursor = ( 386 self._constructor.create_concurrent_cursor_from_perpartition_cursor( 387 state_manager=self._connector_state_manager, 388 model_type=DatetimeBasedCursorModel, 389 component_definition=incremental_sync_component_definition, 390 stream_name=declarative_stream.name, 391 stream_namespace=declarative_stream.namespace, 392 config=config or {}, 393 stream_state=stream_state, 394 partition_router=partition_router, 395 ) 396 ) 397 398 retriever = self._get_retriever(declarative_stream, stream_state) 399 400 partition_generator = StreamSlicerPartitionGenerator( 401 DeclarativePartitionFactory( 402 declarative_stream.name, 403 declarative_stream.get_json_schema(), 404 retriever, 405 self.message_repository, 406 ), 407 perpartition_cursor, 408 ) 409 410 concurrent_streams.append( 411 DefaultStream( 412 partition_generator=partition_generator, 413 name=declarative_stream.name, 414 json_schema=declarative_stream.get_json_schema(), 415 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 416 cursor_field=perpartition_cursor.cursor_field.cursor_field_key, 417 logger=self.logger, 418 cursor=perpartition_cursor, 419 supports_file_transfer=supports_file_transfer, 420 ) 421 ) 422 else: 423 synchronous_streams.append(declarative_stream) 424 # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state. 425 # Condition below needs to ensure that concurrent support is not lost for sources that already support 426 # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe). 427 elif ( 428 isinstance(declarative_stream, AbstractStreamFacade) 429 and self.is_partially_declarative 430 ): 431 concurrent_streams.append(declarative_stream.get_underlying_stream()) 432 else: 433 synchronous_streams.append(declarative_stream) 434 435 return concurrent_streams, synchronous_streams 436 437 def _is_concurrent_cursor_incremental_without_partition_routing( 438 self, 439 declarative_stream: DeclarativeStream, 440 incremental_sync_component_definition: Mapping[str, Any] | None, 441 ) -> bool: 442 return ( 443 incremental_sync_component_definition is not None 444 and bool(incremental_sync_component_definition) 445 and ( 446 incremental_sync_component_definition.get("type", "") 447 in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__) 448 ) 449 and hasattr(declarative_stream.retriever, "stream_slicer") 450 and ( 451 isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) 452 # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor 453 # add isintance check here if we want to create a Declarative IncrementingCountCursor 454 # or isinstance( 455 # declarative_stream.retriever.stream_slicer, IncrementingCountCursor 456 # ) 457 or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter) 458 ) 459 ) 460 461 @staticmethod 462 def _get_retriever( 463 declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any] 464 ) -> Retriever: 465 if declarative_stream and isinstance(declarative_stream.retriever, SimpleRetriever): 466 # We zero it out here, but since this is a cursor reference, the state is still properly 467 # instantiated for the other components that reference it 468 declarative_stream.retriever.cursor = None 469 return declarative_stream.retriever 470 471 @staticmethod 472 def _select_streams( 473 streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog 474 ) -> List[AbstractStream]: 475 stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} 476 abstract_streams: List[AbstractStream] = [] 477 for configured_stream in configured_catalog.streams: 478 stream_instance = stream_name_to_instance.get(configured_stream.stream.name) 479 if stream_instance: 480 abstract_streams.append(stream_instance) 481 482 return abstract_streams 483 484 @staticmethod 485 def _remove_concurrent_streams_from_catalog( 486 catalog: ConfiguredAirbyteCatalog, 487 concurrent_stream_names: set[str], 488 ) -> ConfiguredAirbyteCatalog: 489 return ConfiguredAirbyteCatalog( 490 streams=[ 491 stream 492 for stream in catalog.streams 493 if stream.stream.name not in concurrent_stream_names 494 ] 495 ) 496 497 @staticmethod 498 def _migrate_state( 499 declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any] 500 ) -> MutableMapping[str, Any]: 501 for state_migration in declarative_stream.state_migrations: 502 if state_migration.should_migrate(stream_state): 503 # The state variable is expected to be mutable but the migrate method returns an immutable mapping. 504 stream_state = dict(state_migration.migrate(stream_state)) 505 506 return stream_state
class
ConcurrentDeclarativeSource(airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource, typing.Generic[~TState]):
61class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]): 62 # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock 63 # because it has hit the limit of futures but not partition reader is consuming them. 64 _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 65 66 def __init__( 67 self, 68 catalog: Optional[ConfiguredAirbyteCatalog], 69 config: Optional[Mapping[str, Any]], 70 state: TState, 71 source_config: ConnectionDefinition, 72 debug: bool = False, 73 emit_connector_builder_messages: bool = False, 74 component_factory: Optional[ModelToComponentFactory] = None, 75 config_path: Optional[str] = None, 76 **kwargs: Any, 77 ) -> None: 78 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 79 # no longer needs to store the original incoming state. But maybe there's an edge case? 80 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 81 82 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 83 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 84 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 85 # incremental streams running in full refresh. 86 component_factory = component_factory or ModelToComponentFactory( 87 emit_connector_builder_messages=emit_connector_builder_messages, 88 disable_resumable_full_refresh=True, 89 connector_state_manager=self._connector_state_manager, 90 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 91 ) 92 93 super().__init__( 94 source_config=source_config, 95 config=config, 96 debug=debug, 97 emit_connector_builder_messages=emit_connector_builder_messages, 98 component_factory=component_factory, 99 config_path=config_path, 100 ) 101 102 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 103 if concurrency_level_from_manifest: 104 concurrency_level_component = self._constructor.create_component( 105 model_type=ConcurrencyLevelModel, 106 component_definition=concurrency_level_from_manifest, 107 config=config or {}, 108 ) 109 if not isinstance(concurrency_level_component, ConcurrencyLevel): 110 raise ValueError( 111 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 112 ) 113 114 concurrency_level = concurrency_level_component.get_concurrency_level() 115 initial_number_of_partitions_to_generate = max( 116 concurrency_level // 2, 1 117 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 118 else: 119 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 120 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 121 122 self._concurrent_source = ConcurrentSource.create( 123 num_workers=concurrency_level, 124 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 125 logger=self.logger, 126 slice_logger=self._slice_logger, 127 message_repository=self.message_repository, 128 ) 129 130 # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state. 131 @property 132 def is_partially_declarative(self) -> bool: 133 """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.""" 134 return False 135 136 def read( 137 self, 138 logger: logging.Logger, 139 config: Mapping[str, Any], 140 catalog: ConfiguredAirbyteCatalog, 141 state: Optional[List[AirbyteStateMessage]] = None, 142 ) -> Iterator[AirbyteMessage]: 143 concurrent_streams, _ = self._group_streams(config=config) 144 145 # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of 146 # the concurrent streams must be saved so that they can be removed from the catalog before starting 147 # synchronous streams 148 if len(concurrent_streams) > 0: 149 concurrent_stream_names = set( 150 [concurrent_stream.name for concurrent_stream in concurrent_streams] 151 ) 152 153 selected_concurrent_streams = self._select_streams( 154 streams=concurrent_streams, configured_catalog=catalog 155 ) 156 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 157 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 158 if selected_concurrent_streams: 159 yield from self._concurrent_source.read(selected_concurrent_streams) 160 161 # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the 162 # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many 163 # of which were already synced using the Concurrent CDK 164 filtered_catalog = self._remove_concurrent_streams_from_catalog( 165 catalog=catalog, concurrent_stream_names=concurrent_stream_names 166 ) 167 else: 168 filtered_catalog = catalog 169 170 # It is no need run read for synchronous streams if they are not exists. 171 if not filtered_catalog.streams: 172 return 173 174 yield from super().read(logger, config, filtered_catalog, state) 175 176 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 177 concurrent_streams, synchronous_streams = self._group_streams(config=config) 178 return AirbyteCatalog( 179 streams=[ 180 stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams 181 ] 182 ) 183 184 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 185 """ 186 The `streams` method is used as part of the AbstractSource in the following cases: 187 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 188 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 189 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 190 191 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 192 """ 193 return super().streams(config) 194 195 def _group_streams( 196 self, config: Mapping[str, Any] 197 ) -> Tuple[List[AbstractStream], List[Stream]]: 198 concurrent_streams: List[AbstractStream] = [] 199 synchronous_streams: List[Stream] = [] 200 201 # Combine streams and dynamic_streams. Note: both cannot be empty at the same time, 202 # and this is validated during the initialization of the source. 203 streams = self._stream_configs(self._source_config, config) + self._dynamic_stream_configs( 204 self._source_config, config 205 ) 206 207 name_to_stream_mapping = {stream["name"]: stream for stream in streams} 208 209 for declarative_stream in self.streams(config=config): 210 # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect 211 # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, 212 # so we need to treat them as synchronous 213 214 supports_file_transfer = ( 215 isinstance(declarative_stream, DeclarativeStream) 216 and "file_uploader" in name_to_stream_mapping[declarative_stream.name] 217 ) 218 219 if ( 220 isinstance(declarative_stream, DeclarativeStream) 221 and name_to_stream_mapping[declarative_stream.name]["type"] 222 == "StateDelegatingStream" 223 ): 224 stream_state = self._connector_state_manager.get_stream_state( 225 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 226 ) 227 228 name_to_stream_mapping[declarative_stream.name] = ( 229 name_to_stream_mapping[declarative_stream.name]["incremental_stream"] 230 if stream_state 231 else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"] 232 ) 233 234 if isinstance(declarative_stream, DeclarativeStream) and ( 235 name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 236 == "SimpleRetriever" 237 or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 238 == "AsyncRetriever" 239 ): 240 incremental_sync_component_definition = name_to_stream_mapping[ 241 declarative_stream.name 242 ].get("incremental_sync") 243 244 partition_router_component_definition = ( 245 name_to_stream_mapping[declarative_stream.name] 246 .get("retriever", {}) 247 .get("partition_router") 248 ) 249 is_without_partition_router_or_cursor = not bool( 250 incremental_sync_component_definition 251 ) and not bool(partition_router_component_definition) 252 253 is_substream_without_incremental = ( 254 partition_router_component_definition 255 and not incremental_sync_component_definition 256 ) 257 258 if self._is_concurrent_cursor_incremental_without_partition_routing( 259 declarative_stream, incremental_sync_component_definition 260 ): 261 stream_state = self._connector_state_manager.get_stream_state( 262 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 263 ) 264 stream_state = self._migrate_state(declarative_stream, stream_state) 265 266 retriever = self._get_retriever(declarative_stream, stream_state) 267 268 if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance( 269 declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter 270 ): 271 cursor = declarative_stream.retriever.stream_slicer.stream_slicer 272 273 if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor): 274 # This should never happen since we instantiate ConcurrentCursor in 275 # model_to_component_factory.py 276 raise ValueError( 277 f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}" 278 ) 279 280 partition_generator = StreamSlicerPartitionGenerator( 281 partition_factory=DeclarativePartitionFactory( 282 declarative_stream.name, 283 declarative_stream.get_json_schema(), 284 retriever, 285 self.message_repository, 286 ), 287 stream_slicer=declarative_stream.retriever.stream_slicer, 288 ) 289 else: 290 if ( 291 incremental_sync_component_definition 292 and incremental_sync_component_definition.get("type") 293 == IncrementingCountCursorModel.__name__ 294 ): 295 cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor( 296 model_type=IncrementingCountCursorModel, 297 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 298 stream_name=declarative_stream.name, 299 stream_namespace=declarative_stream.namespace, 300 config=config or {}, 301 ) 302 else: 303 cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( 304 model_type=DatetimeBasedCursorModel, 305 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 306 stream_name=declarative_stream.name, 307 stream_namespace=declarative_stream.namespace, 308 config=config or {}, 309 stream_state_migrations=declarative_stream.state_migrations, 310 ) 311 partition_generator = StreamSlicerPartitionGenerator( 312 partition_factory=DeclarativePartitionFactory( 313 declarative_stream.name, 314 declarative_stream.get_json_schema(), 315 retriever, 316 self.message_repository, 317 ), 318 stream_slicer=cursor, 319 ) 320 321 concurrent_streams.append( 322 DefaultStream( 323 partition_generator=partition_generator, 324 name=declarative_stream.name, 325 json_schema=declarative_stream.get_json_schema(), 326 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 327 cursor_field=cursor.cursor_field.cursor_field_key 328 if hasattr(cursor, "cursor_field") 329 and hasattr( 330 cursor.cursor_field, "cursor_field_key" 331 ) # FIXME this will need to be updated once we do the per partition 332 else None, 333 logger=self.logger, 334 cursor=cursor, 335 supports_file_transfer=supports_file_transfer, 336 ) 337 ) 338 elif ( 339 is_substream_without_incremental or is_without_partition_router_or_cursor 340 ) and hasattr(declarative_stream.retriever, "stream_slicer"): 341 partition_generator = StreamSlicerPartitionGenerator( 342 DeclarativePartitionFactory( 343 declarative_stream.name, 344 declarative_stream.get_json_schema(), 345 declarative_stream.retriever, 346 self.message_repository, 347 ), 348 declarative_stream.retriever.stream_slicer, 349 ) 350 351 final_state_cursor = FinalStateCursor( 352 stream_name=declarative_stream.name, 353 stream_namespace=declarative_stream.namespace, 354 message_repository=self.message_repository, 355 ) 356 357 concurrent_streams.append( 358 DefaultStream( 359 partition_generator=partition_generator, 360 name=declarative_stream.name, 361 json_schema=declarative_stream.get_json_schema(), 362 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 363 cursor_field=None, 364 logger=self.logger, 365 cursor=final_state_cursor, 366 supports_file_transfer=supports_file_transfer, 367 ) 368 ) 369 elif ( 370 incremental_sync_component_definition 371 and incremental_sync_component_definition.get("type", "") 372 == DatetimeBasedCursorModel.__name__ 373 and hasattr(declarative_stream.retriever, "stream_slicer") 374 and isinstance( 375 declarative_stream.retriever.stream_slicer, 376 (GlobalSubstreamCursor, PerPartitionWithGlobalCursor), 377 ) 378 ): 379 stream_state = self._connector_state_manager.get_stream_state( 380 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 381 ) 382 stream_state = self._migrate_state(declarative_stream, stream_state) 383 384 partition_router = declarative_stream.retriever.stream_slicer._partition_router 385 386 perpartition_cursor = ( 387 self._constructor.create_concurrent_cursor_from_perpartition_cursor( 388 state_manager=self._connector_state_manager, 389 model_type=DatetimeBasedCursorModel, 390 component_definition=incremental_sync_component_definition, 391 stream_name=declarative_stream.name, 392 stream_namespace=declarative_stream.namespace, 393 config=config or {}, 394 stream_state=stream_state, 395 partition_router=partition_router, 396 ) 397 ) 398 399 retriever = self._get_retriever(declarative_stream, stream_state) 400 401 partition_generator = StreamSlicerPartitionGenerator( 402 DeclarativePartitionFactory( 403 declarative_stream.name, 404 declarative_stream.get_json_schema(), 405 retriever, 406 self.message_repository, 407 ), 408 perpartition_cursor, 409 ) 410 411 concurrent_streams.append( 412 DefaultStream( 413 partition_generator=partition_generator, 414 name=declarative_stream.name, 415 json_schema=declarative_stream.get_json_schema(), 416 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 417 cursor_field=perpartition_cursor.cursor_field.cursor_field_key, 418 logger=self.logger, 419 cursor=perpartition_cursor, 420 supports_file_transfer=supports_file_transfer, 421 ) 422 ) 423 else: 424 synchronous_streams.append(declarative_stream) 425 # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state. 426 # Condition below needs to ensure that concurrent support is not lost for sources that already support 427 # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe). 428 elif ( 429 isinstance(declarative_stream, AbstractStreamFacade) 430 and self.is_partially_declarative 431 ): 432 concurrent_streams.append(declarative_stream.get_underlying_stream()) 433 else: 434 synchronous_streams.append(declarative_stream) 435 436 return concurrent_streams, synchronous_streams 437 438 def _is_concurrent_cursor_incremental_without_partition_routing( 439 self, 440 declarative_stream: DeclarativeStream, 441 incremental_sync_component_definition: Mapping[str, Any] | None, 442 ) -> bool: 443 return ( 444 incremental_sync_component_definition is not None 445 and bool(incremental_sync_component_definition) 446 and ( 447 incremental_sync_component_definition.get("type", "") 448 in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__) 449 ) 450 and hasattr(declarative_stream.retriever, "stream_slicer") 451 and ( 452 isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) 453 # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor 454 # add isintance check here if we want to create a Declarative IncrementingCountCursor 455 # or isinstance( 456 # declarative_stream.retriever.stream_slicer, IncrementingCountCursor 457 # ) 458 or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter) 459 ) 460 ) 461 462 @staticmethod 463 def _get_retriever( 464 declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any] 465 ) -> Retriever: 466 if declarative_stream and isinstance(declarative_stream.retriever, SimpleRetriever): 467 # We zero it out here, but since this is a cursor reference, the state is still properly 468 # instantiated for the other components that reference it 469 declarative_stream.retriever.cursor = None 470 return declarative_stream.retriever 471 472 @staticmethod 473 def _select_streams( 474 streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog 475 ) -> List[AbstractStream]: 476 stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} 477 abstract_streams: List[AbstractStream] = [] 478 for configured_stream in configured_catalog.streams: 479 stream_instance = stream_name_to_instance.get(configured_stream.stream.name) 480 if stream_instance: 481 abstract_streams.append(stream_instance) 482 483 return abstract_streams 484 485 @staticmethod 486 def _remove_concurrent_streams_from_catalog( 487 catalog: ConfiguredAirbyteCatalog, 488 concurrent_stream_names: set[str], 489 ) -> ConfiguredAirbyteCatalog: 490 return ConfiguredAirbyteCatalog( 491 streams=[ 492 stream 493 for stream in catalog.streams 494 if stream.stream.name not in concurrent_stream_names 495 ] 496 ) 497 498 @staticmethod 499 def _migrate_state( 500 declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any] 501 ) -> MutableMapping[str, Any]: 502 for state_migration in declarative_stream.state_migrations: 503 if state_migration.should_migrate(stream_state): 504 # The state variable is expected to be mutable but the migrate method returns an immutable mapping. 505 stream_state = dict(state_migration.migrate(stream_state)) 506 507 return stream_state
Declarative source defined by a manifest of low-code components that define source connector behavior
is_partially_declarative: bool
131 @property 132 def is_partially_declarative(self) -> bool: 133 """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.""" 134 return False
This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.
def
discover( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteCatalog:
176 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 177 concurrent_streams, synchronous_streams = self._group_streams(config=config) 178 return AirbyteCatalog( 179 streams=[ 180 stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams 181 ] 182 )
Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.