airbyte_cdk.sources.declarative.concurrent_declarative_source
1# 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 3# 4 5import logging 6from typing import Any, Generic, Iterator, List, Mapping, MutableMapping, Optional, Tuple 7 8from airbyte_cdk.models import ( 9 AirbyteCatalog, 10 AirbyteMessage, 11 AirbyteStateMessage, 12 ConfiguredAirbyteCatalog, 13) 14from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource 15from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 16from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel 17from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream 18from airbyte_cdk.sources.declarative.extractors import RecordSelector 19from airbyte_cdk.sources.declarative.extractors.record_filter import ( 20 ClientSideIncrementalRecordFilterDecorator, 21) 22from airbyte_cdk.sources.declarative.incremental import ( 23 ConcurrentPerPartitionCursor, 24 GlobalSubstreamCursor, 25) 26from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor 27from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import ( 28 PerPartitionWithGlobalCursor, 29) 30from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource 31from airbyte_cdk.sources.declarative.models import FileUploader 32from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 33 ConcurrencyLevel as ConcurrencyLevelModel, 34) 35from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 36 DatetimeBasedCursor as DatetimeBasedCursorModel, 37) 38from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 39 IncrementingCountCursor as IncrementingCountCursorModel, 40) 41from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( 42 ModelToComponentFactory, 43) 44from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter 45from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever 46from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( 47 DeclarativePartitionFactory, 48 StreamSlicerPartitionGenerator, 49) 50from airbyte_cdk.sources.declarative.types import ConnectionDefinition 51from airbyte_cdk.sources.source import TState 52from airbyte_cdk.sources.streams import Stream 53from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 54from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade 55from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( 56 AlwaysAvailableAvailabilityStrategy, 57) 58from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor 59from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream 60from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream 61 62 63class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]): 64 # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock 65 # because it has hit the limit of futures but not partition reader is consuming them. 66 _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 67 68 def __init__( 69 self, 70 catalog: Optional[ConfiguredAirbyteCatalog], 71 config: Optional[Mapping[str, Any]], 72 state: TState, 73 source_config: ConnectionDefinition, 74 debug: bool = False, 75 emit_connector_builder_messages: bool = False, 76 component_factory: Optional[ModelToComponentFactory] = None, 77 config_path: Optional[str] = None, 78 **kwargs: Any, 79 ) -> None: 80 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 81 # no longer needs to store the original incoming state. But maybe there's an edge case? 82 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 83 84 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 85 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 86 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 87 # incremental streams running in full refresh. 88 component_factory = component_factory or ModelToComponentFactory( 89 emit_connector_builder_messages=emit_connector_builder_messages, 90 disable_resumable_full_refresh=True, 91 connector_state_manager=self._connector_state_manager, 92 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 93 ) 94 95 super().__init__( 96 source_config=source_config, 97 config=config, 98 debug=debug, 99 emit_connector_builder_messages=emit_connector_builder_messages, 100 component_factory=component_factory, 101 config_path=config_path, 102 ) 103 104 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 105 if concurrency_level_from_manifest: 106 concurrency_level_component = self._constructor.create_component( 107 model_type=ConcurrencyLevelModel, 108 component_definition=concurrency_level_from_manifest, 109 config=config or {}, 110 ) 111 if not isinstance(concurrency_level_component, ConcurrencyLevel): 112 raise ValueError( 113 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 114 ) 115 116 concurrency_level = concurrency_level_component.get_concurrency_level() 117 initial_number_of_partitions_to_generate = max( 118 concurrency_level // 2, 1 119 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 120 else: 121 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 122 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 123 124 self._concurrent_source = ConcurrentSource.create( 125 num_workers=concurrency_level, 126 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 127 logger=self.logger, 128 slice_logger=self._slice_logger, 129 message_repository=self.message_repository, 130 ) 131 132 # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state. 133 @property 134 def is_partially_declarative(self) -> bool: 135 """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.""" 136 return False 137 138 def read( 139 self, 140 logger: logging.Logger, 141 config: Mapping[str, Any], 142 catalog: ConfiguredAirbyteCatalog, 143 state: Optional[List[AirbyteStateMessage]] = None, 144 ) -> Iterator[AirbyteMessage]: 145 concurrent_streams, _ = self._group_streams(config=config) 146 147 # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of 148 # the concurrent streams must be saved so that they can be removed from the catalog before starting 149 # synchronous streams 150 if len(concurrent_streams) > 0: 151 concurrent_stream_names = set( 152 [concurrent_stream.name for concurrent_stream in concurrent_streams] 153 ) 154 155 selected_concurrent_streams = self._select_streams( 156 streams=concurrent_streams, configured_catalog=catalog 157 ) 158 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 159 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 160 if selected_concurrent_streams: 161 yield from self._concurrent_source.read(selected_concurrent_streams) 162 163 # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the 164 # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many 165 # of which were already synced using the Concurrent CDK 166 filtered_catalog = self._remove_concurrent_streams_from_catalog( 167 catalog=catalog, concurrent_stream_names=concurrent_stream_names 168 ) 169 else: 170 filtered_catalog = catalog 171 172 # It is no need run read for synchronous streams if they are not exists. 173 if not filtered_catalog.streams: 174 return 175 176 yield from super().read(logger, config, filtered_catalog, state) 177 178 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 179 concurrent_streams, synchronous_streams = self._group_streams(config=config) 180 return AirbyteCatalog( 181 streams=[ 182 stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams 183 ] 184 ) 185 186 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 187 """ 188 The `streams` method is used as part of the AbstractSource in the following cases: 189 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 190 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 191 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 192 193 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 194 """ 195 return super().streams(config) 196 197 def _group_streams( 198 self, config: Mapping[str, Any] 199 ) -> Tuple[List[AbstractStream], List[Stream]]: 200 concurrent_streams: List[AbstractStream] = [] 201 synchronous_streams: List[Stream] = [] 202 203 # Combine streams and dynamic_streams. Note: both cannot be empty at the same time, 204 # and this is validated during the initialization of the source. 205 streams = self._stream_configs(self._source_config, config) + self._dynamic_stream_configs( 206 self._source_config, config 207 ) 208 209 name_to_stream_mapping = {stream["name"]: stream for stream in streams} 210 211 for declarative_stream in self.streams(config=config): 212 # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect 213 # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, 214 # so we need to treat them as synchronous 215 216 supports_file_transfer = ( 217 isinstance(declarative_stream, DeclarativeStream) 218 and "file_uploader" in name_to_stream_mapping[declarative_stream.name] 219 ) 220 221 if ( 222 isinstance(declarative_stream, DeclarativeStream) 223 and name_to_stream_mapping[declarative_stream.name]["type"] 224 == "StateDelegatingStream" 225 ): 226 stream_state = self._connector_state_manager.get_stream_state( 227 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 228 ) 229 230 name_to_stream_mapping[declarative_stream.name] = ( 231 name_to_stream_mapping[declarative_stream.name]["incremental_stream"] 232 if stream_state 233 else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"] 234 ) 235 236 if isinstance(declarative_stream, DeclarativeStream) and ( 237 name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 238 == "SimpleRetriever" 239 or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 240 == "AsyncRetriever" 241 ): 242 incremental_sync_component_definition = name_to_stream_mapping[ 243 declarative_stream.name 244 ].get("incremental_sync") 245 246 partition_router_component_definition = ( 247 name_to_stream_mapping[declarative_stream.name] 248 .get("retriever", {}) 249 .get("partition_router") 250 ) 251 is_without_partition_router_or_cursor = not bool( 252 incremental_sync_component_definition 253 ) and not bool(partition_router_component_definition) 254 255 is_substream_without_incremental = ( 256 partition_router_component_definition 257 and not incremental_sync_component_definition 258 ) 259 260 if self._is_concurrent_cursor_incremental_without_partition_routing( 261 declarative_stream, incremental_sync_component_definition 262 ): 263 stream_state = self._connector_state_manager.get_stream_state( 264 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 265 ) 266 stream_state = self._migrate_state(declarative_stream, stream_state) 267 268 retriever = self._get_retriever(declarative_stream, stream_state) 269 270 if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance( 271 declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter 272 ): 273 cursor = declarative_stream.retriever.stream_slicer.stream_slicer 274 275 if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor): 276 # This should never happen since we instantiate ConcurrentCursor in 277 # model_to_component_factory.py 278 raise ValueError( 279 f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}" 280 ) 281 282 partition_generator = StreamSlicerPartitionGenerator( 283 partition_factory=DeclarativePartitionFactory( 284 declarative_stream.name, 285 declarative_stream.get_json_schema(), 286 retriever, 287 self.message_repository, 288 ), 289 stream_slicer=declarative_stream.retriever.stream_slicer, 290 ) 291 else: 292 if ( 293 incremental_sync_component_definition 294 and incremental_sync_component_definition.get("type") 295 == IncrementingCountCursorModel.__name__ 296 ): 297 cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor( 298 model_type=IncrementingCountCursorModel, 299 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 300 stream_name=declarative_stream.name, 301 stream_namespace=declarative_stream.namespace, 302 config=config or {}, 303 ) 304 else: 305 cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( 306 model_type=DatetimeBasedCursorModel, 307 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 308 stream_name=declarative_stream.name, 309 stream_namespace=declarative_stream.namespace, 310 config=config or {}, 311 stream_state_migrations=declarative_stream.state_migrations, 312 ) 313 partition_generator = StreamSlicerPartitionGenerator( 314 partition_factory=DeclarativePartitionFactory( 315 declarative_stream.name, 316 declarative_stream.get_json_schema(), 317 retriever, 318 self.message_repository, 319 ), 320 stream_slicer=cursor, 321 ) 322 323 concurrent_streams.append( 324 DefaultStream( 325 partition_generator=partition_generator, 326 name=declarative_stream.name, 327 json_schema=declarative_stream.get_json_schema(), 328 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 329 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 330 cursor_field=cursor.cursor_field.cursor_field_key 331 if hasattr(cursor, "cursor_field") 332 and hasattr( 333 cursor.cursor_field, "cursor_field_key" 334 ) # FIXME this will need to be updated once we do the per partition 335 else None, 336 logger=self.logger, 337 cursor=cursor, 338 supports_file_transfer=supports_file_transfer, 339 ) 340 ) 341 elif ( 342 is_substream_without_incremental or is_without_partition_router_or_cursor 343 ) and hasattr(declarative_stream.retriever, "stream_slicer"): 344 partition_generator = StreamSlicerPartitionGenerator( 345 DeclarativePartitionFactory( 346 declarative_stream.name, 347 declarative_stream.get_json_schema(), 348 declarative_stream.retriever, 349 self.message_repository, 350 ), 351 declarative_stream.retriever.stream_slicer, 352 ) 353 354 final_state_cursor = FinalStateCursor( 355 stream_name=declarative_stream.name, 356 stream_namespace=declarative_stream.namespace, 357 message_repository=self.message_repository, 358 ) 359 360 concurrent_streams.append( 361 DefaultStream( 362 partition_generator=partition_generator, 363 name=declarative_stream.name, 364 json_schema=declarative_stream.get_json_schema(), 365 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 366 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 367 cursor_field=None, 368 logger=self.logger, 369 cursor=final_state_cursor, 370 supports_file_transfer=supports_file_transfer, 371 ) 372 ) 373 elif ( 374 incremental_sync_component_definition 375 and incremental_sync_component_definition.get("type", "") 376 == DatetimeBasedCursorModel.__name__ 377 and hasattr(declarative_stream.retriever, "stream_slicer") 378 and isinstance( 379 declarative_stream.retriever.stream_slicer, 380 (GlobalSubstreamCursor, PerPartitionWithGlobalCursor), 381 ) 382 ): 383 stream_state = self._connector_state_manager.get_stream_state( 384 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 385 ) 386 stream_state = self._migrate_state(declarative_stream, stream_state) 387 388 partition_router = declarative_stream.retriever.stream_slicer._partition_router 389 390 perpartition_cursor = ( 391 self._constructor.create_concurrent_cursor_from_perpartition_cursor( 392 state_manager=self._connector_state_manager, 393 model_type=DatetimeBasedCursorModel, 394 component_definition=incremental_sync_component_definition, 395 stream_name=declarative_stream.name, 396 stream_namespace=declarative_stream.namespace, 397 config=config or {}, 398 stream_state=stream_state, 399 partition_router=partition_router, 400 ) 401 ) 402 403 retriever = self._get_retriever(declarative_stream, stream_state) 404 405 partition_generator = StreamSlicerPartitionGenerator( 406 DeclarativePartitionFactory( 407 declarative_stream.name, 408 declarative_stream.get_json_schema(), 409 retriever, 410 self.message_repository, 411 ), 412 perpartition_cursor, 413 ) 414 415 concurrent_streams.append( 416 DefaultStream( 417 partition_generator=partition_generator, 418 name=declarative_stream.name, 419 json_schema=declarative_stream.get_json_schema(), 420 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 421 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 422 cursor_field=perpartition_cursor.cursor_field.cursor_field_key, 423 logger=self.logger, 424 cursor=perpartition_cursor, 425 supports_file_transfer=supports_file_transfer, 426 ) 427 ) 428 else: 429 synchronous_streams.append(declarative_stream) 430 # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state. 431 # Condition below needs to ensure that concurrent support is not lost for sources that already support 432 # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe). 433 elif ( 434 isinstance(declarative_stream, AbstractStreamFacade) 435 and self.is_partially_declarative 436 ): 437 concurrent_streams.append(declarative_stream.get_underlying_stream()) 438 else: 439 synchronous_streams.append(declarative_stream) 440 441 return concurrent_streams, synchronous_streams 442 443 def _is_concurrent_cursor_incremental_without_partition_routing( 444 self, 445 declarative_stream: DeclarativeStream, 446 incremental_sync_component_definition: Mapping[str, Any] | None, 447 ) -> bool: 448 return ( 449 incremental_sync_component_definition is not None 450 and bool(incremental_sync_component_definition) 451 and ( 452 incremental_sync_component_definition.get("type", "") 453 in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__) 454 ) 455 and hasattr(declarative_stream.retriever, "stream_slicer") 456 and ( 457 isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) 458 # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor 459 # add isintance check here if we want to create a Declarative IncrementingCountCursor 460 # or isinstance( 461 # declarative_stream.retriever.stream_slicer, IncrementingCountCursor 462 # ) 463 or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter) 464 ) 465 ) 466 467 @staticmethod 468 def _get_retriever( 469 declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any] 470 ) -> Retriever: 471 retriever = declarative_stream.retriever 472 473 # This is an optimization so that we don't invoke any cursor or state management flows within the 474 # low-code framework because state management is handled through the ConcurrentCursor. 475 if declarative_stream and isinstance(retriever, SimpleRetriever): 476 # Also a temporary hack. In the legacy Stream implementation, as part of the read, 477 # set_initial_state() is called to instantiate incoming state on the cursor. Although we no 478 # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components 479 # like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is 480 # properly initialized with state. 481 if retriever.cursor: 482 retriever.cursor.set_initial_state(stream_state=stream_state) 483 484 # Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance 485 # from the one initialized on the SimpleRetriever, so it also must also have state initialized 486 # for semi-incremental streams using is_client_side_incremental to filter properly 487 if isinstance(retriever.record_selector, RecordSelector) and isinstance( 488 retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator 489 ): 490 retriever.record_selector.record_filter._cursor.set_initial_state( 491 stream_state=stream_state 492 ) # type: ignore # After non-concurrent cursors are deprecated we can remove these cursor workarounds 493 494 # We zero it out here, but since this is a cursor reference, the state is still properly 495 # instantiated for the other components that reference it 496 retriever.cursor = None 497 498 return retriever 499 500 @staticmethod 501 def _select_streams( 502 streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog 503 ) -> List[AbstractStream]: 504 stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} 505 abstract_streams: List[AbstractStream] = [] 506 for configured_stream in configured_catalog.streams: 507 stream_instance = stream_name_to_instance.get(configured_stream.stream.name) 508 if stream_instance: 509 abstract_streams.append(stream_instance) 510 511 return abstract_streams 512 513 @staticmethod 514 def _remove_concurrent_streams_from_catalog( 515 catalog: ConfiguredAirbyteCatalog, 516 concurrent_stream_names: set[str], 517 ) -> ConfiguredAirbyteCatalog: 518 return ConfiguredAirbyteCatalog( 519 streams=[ 520 stream 521 for stream in catalog.streams 522 if stream.stream.name not in concurrent_stream_names 523 ] 524 ) 525 526 @staticmethod 527 def _migrate_state( 528 declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any] 529 ) -> MutableMapping[str, Any]: 530 for state_migration in declarative_stream.state_migrations: 531 if state_migration.should_migrate(stream_state): 532 # The state variable is expected to be mutable but the migrate method returns an immutable mapping. 533 stream_state = dict(state_migration.migrate(stream_state)) 534 535 return stream_state
class
ConcurrentDeclarativeSource(airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource, typing.Generic[~TState]):
64class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]): 65 # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock 66 # because it has hit the limit of futures but not partition reader is consuming them. 67 _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 68 69 def __init__( 70 self, 71 catalog: Optional[ConfiguredAirbyteCatalog], 72 config: Optional[Mapping[str, Any]], 73 state: TState, 74 source_config: ConnectionDefinition, 75 debug: bool = False, 76 emit_connector_builder_messages: bool = False, 77 component_factory: Optional[ModelToComponentFactory] = None, 78 config_path: Optional[str] = None, 79 **kwargs: Any, 80 ) -> None: 81 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 82 # no longer needs to store the original incoming state. But maybe there's an edge case? 83 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 84 85 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 86 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 87 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 88 # incremental streams running in full refresh. 89 component_factory = component_factory or ModelToComponentFactory( 90 emit_connector_builder_messages=emit_connector_builder_messages, 91 disable_resumable_full_refresh=True, 92 connector_state_manager=self._connector_state_manager, 93 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 94 ) 95 96 super().__init__( 97 source_config=source_config, 98 config=config, 99 debug=debug, 100 emit_connector_builder_messages=emit_connector_builder_messages, 101 component_factory=component_factory, 102 config_path=config_path, 103 ) 104 105 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 106 if concurrency_level_from_manifest: 107 concurrency_level_component = self._constructor.create_component( 108 model_type=ConcurrencyLevelModel, 109 component_definition=concurrency_level_from_manifest, 110 config=config or {}, 111 ) 112 if not isinstance(concurrency_level_component, ConcurrencyLevel): 113 raise ValueError( 114 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 115 ) 116 117 concurrency_level = concurrency_level_component.get_concurrency_level() 118 initial_number_of_partitions_to_generate = max( 119 concurrency_level // 2, 1 120 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 121 else: 122 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 123 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 124 125 self._concurrent_source = ConcurrentSource.create( 126 num_workers=concurrency_level, 127 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 128 logger=self.logger, 129 slice_logger=self._slice_logger, 130 message_repository=self.message_repository, 131 ) 132 133 # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state. 134 @property 135 def is_partially_declarative(self) -> bool: 136 """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.""" 137 return False 138 139 def read( 140 self, 141 logger: logging.Logger, 142 config: Mapping[str, Any], 143 catalog: ConfiguredAirbyteCatalog, 144 state: Optional[List[AirbyteStateMessage]] = None, 145 ) -> Iterator[AirbyteMessage]: 146 concurrent_streams, _ = self._group_streams(config=config) 147 148 # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of 149 # the concurrent streams must be saved so that they can be removed from the catalog before starting 150 # synchronous streams 151 if len(concurrent_streams) > 0: 152 concurrent_stream_names = set( 153 [concurrent_stream.name for concurrent_stream in concurrent_streams] 154 ) 155 156 selected_concurrent_streams = self._select_streams( 157 streams=concurrent_streams, configured_catalog=catalog 158 ) 159 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 160 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 161 if selected_concurrent_streams: 162 yield from self._concurrent_source.read(selected_concurrent_streams) 163 164 # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the 165 # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many 166 # of which were already synced using the Concurrent CDK 167 filtered_catalog = self._remove_concurrent_streams_from_catalog( 168 catalog=catalog, concurrent_stream_names=concurrent_stream_names 169 ) 170 else: 171 filtered_catalog = catalog 172 173 # It is no need run read for synchronous streams if they are not exists. 174 if not filtered_catalog.streams: 175 return 176 177 yield from super().read(logger, config, filtered_catalog, state) 178 179 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 180 concurrent_streams, synchronous_streams = self._group_streams(config=config) 181 return AirbyteCatalog( 182 streams=[ 183 stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams 184 ] 185 ) 186 187 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 188 """ 189 The `streams` method is used as part of the AbstractSource in the following cases: 190 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 191 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 192 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 193 194 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 195 """ 196 return super().streams(config) 197 198 def _group_streams( 199 self, config: Mapping[str, Any] 200 ) -> Tuple[List[AbstractStream], List[Stream]]: 201 concurrent_streams: List[AbstractStream] = [] 202 synchronous_streams: List[Stream] = [] 203 204 # Combine streams and dynamic_streams. Note: both cannot be empty at the same time, 205 # and this is validated during the initialization of the source. 206 streams = self._stream_configs(self._source_config, config) + self._dynamic_stream_configs( 207 self._source_config, config 208 ) 209 210 name_to_stream_mapping = {stream["name"]: stream for stream in streams} 211 212 for declarative_stream in self.streams(config=config): 213 # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect 214 # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, 215 # so we need to treat them as synchronous 216 217 supports_file_transfer = ( 218 isinstance(declarative_stream, DeclarativeStream) 219 and "file_uploader" in name_to_stream_mapping[declarative_stream.name] 220 ) 221 222 if ( 223 isinstance(declarative_stream, DeclarativeStream) 224 and name_to_stream_mapping[declarative_stream.name]["type"] 225 == "StateDelegatingStream" 226 ): 227 stream_state = self._connector_state_manager.get_stream_state( 228 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 229 ) 230 231 name_to_stream_mapping[declarative_stream.name] = ( 232 name_to_stream_mapping[declarative_stream.name]["incremental_stream"] 233 if stream_state 234 else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"] 235 ) 236 237 if isinstance(declarative_stream, DeclarativeStream) and ( 238 name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 239 == "SimpleRetriever" 240 or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 241 == "AsyncRetriever" 242 ): 243 incremental_sync_component_definition = name_to_stream_mapping[ 244 declarative_stream.name 245 ].get("incremental_sync") 246 247 partition_router_component_definition = ( 248 name_to_stream_mapping[declarative_stream.name] 249 .get("retriever", {}) 250 .get("partition_router") 251 ) 252 is_without_partition_router_or_cursor = not bool( 253 incremental_sync_component_definition 254 ) and not bool(partition_router_component_definition) 255 256 is_substream_without_incremental = ( 257 partition_router_component_definition 258 and not incremental_sync_component_definition 259 ) 260 261 if self._is_concurrent_cursor_incremental_without_partition_routing( 262 declarative_stream, incremental_sync_component_definition 263 ): 264 stream_state = self._connector_state_manager.get_stream_state( 265 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 266 ) 267 stream_state = self._migrate_state(declarative_stream, stream_state) 268 269 retriever = self._get_retriever(declarative_stream, stream_state) 270 271 if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance( 272 declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter 273 ): 274 cursor = declarative_stream.retriever.stream_slicer.stream_slicer 275 276 if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor): 277 # This should never happen since we instantiate ConcurrentCursor in 278 # model_to_component_factory.py 279 raise ValueError( 280 f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}" 281 ) 282 283 partition_generator = StreamSlicerPartitionGenerator( 284 partition_factory=DeclarativePartitionFactory( 285 declarative_stream.name, 286 declarative_stream.get_json_schema(), 287 retriever, 288 self.message_repository, 289 ), 290 stream_slicer=declarative_stream.retriever.stream_slicer, 291 ) 292 else: 293 if ( 294 incremental_sync_component_definition 295 and incremental_sync_component_definition.get("type") 296 == IncrementingCountCursorModel.__name__ 297 ): 298 cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor( 299 model_type=IncrementingCountCursorModel, 300 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 301 stream_name=declarative_stream.name, 302 stream_namespace=declarative_stream.namespace, 303 config=config or {}, 304 ) 305 else: 306 cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( 307 model_type=DatetimeBasedCursorModel, 308 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 309 stream_name=declarative_stream.name, 310 stream_namespace=declarative_stream.namespace, 311 config=config or {}, 312 stream_state_migrations=declarative_stream.state_migrations, 313 ) 314 partition_generator = StreamSlicerPartitionGenerator( 315 partition_factory=DeclarativePartitionFactory( 316 declarative_stream.name, 317 declarative_stream.get_json_schema(), 318 retriever, 319 self.message_repository, 320 ), 321 stream_slicer=cursor, 322 ) 323 324 concurrent_streams.append( 325 DefaultStream( 326 partition_generator=partition_generator, 327 name=declarative_stream.name, 328 json_schema=declarative_stream.get_json_schema(), 329 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 330 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 331 cursor_field=cursor.cursor_field.cursor_field_key 332 if hasattr(cursor, "cursor_field") 333 and hasattr( 334 cursor.cursor_field, "cursor_field_key" 335 ) # FIXME this will need to be updated once we do the per partition 336 else None, 337 logger=self.logger, 338 cursor=cursor, 339 supports_file_transfer=supports_file_transfer, 340 ) 341 ) 342 elif ( 343 is_substream_without_incremental or is_without_partition_router_or_cursor 344 ) and hasattr(declarative_stream.retriever, "stream_slicer"): 345 partition_generator = StreamSlicerPartitionGenerator( 346 DeclarativePartitionFactory( 347 declarative_stream.name, 348 declarative_stream.get_json_schema(), 349 declarative_stream.retriever, 350 self.message_repository, 351 ), 352 declarative_stream.retriever.stream_slicer, 353 ) 354 355 final_state_cursor = FinalStateCursor( 356 stream_name=declarative_stream.name, 357 stream_namespace=declarative_stream.namespace, 358 message_repository=self.message_repository, 359 ) 360 361 concurrent_streams.append( 362 DefaultStream( 363 partition_generator=partition_generator, 364 name=declarative_stream.name, 365 json_schema=declarative_stream.get_json_schema(), 366 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 367 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 368 cursor_field=None, 369 logger=self.logger, 370 cursor=final_state_cursor, 371 supports_file_transfer=supports_file_transfer, 372 ) 373 ) 374 elif ( 375 incremental_sync_component_definition 376 and incremental_sync_component_definition.get("type", "") 377 == DatetimeBasedCursorModel.__name__ 378 and hasattr(declarative_stream.retriever, "stream_slicer") 379 and isinstance( 380 declarative_stream.retriever.stream_slicer, 381 (GlobalSubstreamCursor, PerPartitionWithGlobalCursor), 382 ) 383 ): 384 stream_state = self._connector_state_manager.get_stream_state( 385 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 386 ) 387 stream_state = self._migrate_state(declarative_stream, stream_state) 388 389 partition_router = declarative_stream.retriever.stream_slicer._partition_router 390 391 perpartition_cursor = ( 392 self._constructor.create_concurrent_cursor_from_perpartition_cursor( 393 state_manager=self._connector_state_manager, 394 model_type=DatetimeBasedCursorModel, 395 component_definition=incremental_sync_component_definition, 396 stream_name=declarative_stream.name, 397 stream_namespace=declarative_stream.namespace, 398 config=config or {}, 399 stream_state=stream_state, 400 partition_router=partition_router, 401 ) 402 ) 403 404 retriever = self._get_retriever(declarative_stream, stream_state) 405 406 partition_generator = StreamSlicerPartitionGenerator( 407 DeclarativePartitionFactory( 408 declarative_stream.name, 409 declarative_stream.get_json_schema(), 410 retriever, 411 self.message_repository, 412 ), 413 perpartition_cursor, 414 ) 415 416 concurrent_streams.append( 417 DefaultStream( 418 partition_generator=partition_generator, 419 name=declarative_stream.name, 420 json_schema=declarative_stream.get_json_schema(), 421 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 422 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 423 cursor_field=perpartition_cursor.cursor_field.cursor_field_key, 424 logger=self.logger, 425 cursor=perpartition_cursor, 426 supports_file_transfer=supports_file_transfer, 427 ) 428 ) 429 else: 430 synchronous_streams.append(declarative_stream) 431 # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state. 432 # Condition below needs to ensure that concurrent support is not lost for sources that already support 433 # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe). 434 elif ( 435 isinstance(declarative_stream, AbstractStreamFacade) 436 and self.is_partially_declarative 437 ): 438 concurrent_streams.append(declarative_stream.get_underlying_stream()) 439 else: 440 synchronous_streams.append(declarative_stream) 441 442 return concurrent_streams, synchronous_streams 443 444 def _is_concurrent_cursor_incremental_without_partition_routing( 445 self, 446 declarative_stream: DeclarativeStream, 447 incremental_sync_component_definition: Mapping[str, Any] | None, 448 ) -> bool: 449 return ( 450 incremental_sync_component_definition is not None 451 and bool(incremental_sync_component_definition) 452 and ( 453 incremental_sync_component_definition.get("type", "") 454 in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__) 455 ) 456 and hasattr(declarative_stream.retriever, "stream_slicer") 457 and ( 458 isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) 459 # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor 460 # add isintance check here if we want to create a Declarative IncrementingCountCursor 461 # or isinstance( 462 # declarative_stream.retriever.stream_slicer, IncrementingCountCursor 463 # ) 464 or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter) 465 ) 466 ) 467 468 @staticmethod 469 def _get_retriever( 470 declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any] 471 ) -> Retriever: 472 retriever = declarative_stream.retriever 473 474 # This is an optimization so that we don't invoke any cursor or state management flows within the 475 # low-code framework because state management is handled through the ConcurrentCursor. 476 if declarative_stream and isinstance(retriever, SimpleRetriever): 477 # Also a temporary hack. In the legacy Stream implementation, as part of the read, 478 # set_initial_state() is called to instantiate incoming state on the cursor. Although we no 479 # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components 480 # like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is 481 # properly initialized with state. 482 if retriever.cursor: 483 retriever.cursor.set_initial_state(stream_state=stream_state) 484 485 # Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance 486 # from the one initialized on the SimpleRetriever, so it also must also have state initialized 487 # for semi-incremental streams using is_client_side_incremental to filter properly 488 if isinstance(retriever.record_selector, RecordSelector) and isinstance( 489 retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator 490 ): 491 retriever.record_selector.record_filter._cursor.set_initial_state( 492 stream_state=stream_state 493 ) # type: ignore # After non-concurrent cursors are deprecated we can remove these cursor workarounds 494 495 # We zero it out here, but since this is a cursor reference, the state is still properly 496 # instantiated for the other components that reference it 497 retriever.cursor = None 498 499 return retriever 500 501 @staticmethod 502 def _select_streams( 503 streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog 504 ) -> List[AbstractStream]: 505 stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} 506 abstract_streams: List[AbstractStream] = [] 507 for configured_stream in configured_catalog.streams: 508 stream_instance = stream_name_to_instance.get(configured_stream.stream.name) 509 if stream_instance: 510 abstract_streams.append(stream_instance) 511 512 return abstract_streams 513 514 @staticmethod 515 def _remove_concurrent_streams_from_catalog( 516 catalog: ConfiguredAirbyteCatalog, 517 concurrent_stream_names: set[str], 518 ) -> ConfiguredAirbyteCatalog: 519 return ConfiguredAirbyteCatalog( 520 streams=[ 521 stream 522 for stream in catalog.streams 523 if stream.stream.name not in concurrent_stream_names 524 ] 525 ) 526 527 @staticmethod 528 def _migrate_state( 529 declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any] 530 ) -> MutableMapping[str, Any]: 531 for state_migration in declarative_stream.state_migrations: 532 if state_migration.should_migrate(stream_state): 533 # The state variable is expected to be mutable but the migrate method returns an immutable mapping. 534 stream_state = dict(state_migration.migrate(stream_state)) 535 536 return stream_state
Declarative source defined by a manifest of low-code components that define source connector behavior
is_partially_declarative: bool
134 @property 135 def is_partially_declarative(self) -> bool: 136 """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.""" 137 return False
This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.
def
discover( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteCatalog:
179 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 180 concurrent_streams, synchronous_streams = self._group_streams(config=config) 181 return AirbyteCatalog( 182 streams=[ 183 stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams 184 ] 185 )
Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.