airbyte_cdk.sources.declarative.concurrent_declarative_source
1# 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 3# 4 5import logging 6from typing import Any, Generic, Iterator, List, Mapping, MutableMapping, Optional, Tuple 7 8from airbyte_cdk.models import ( 9 AirbyteCatalog, 10 AirbyteMessage, 11 AirbyteStateMessage, 12 ConfiguredAirbyteCatalog, 13) 14from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource 15from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 16from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel 17from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream 18from airbyte_cdk.sources.declarative.extractors import RecordSelector 19from airbyte_cdk.sources.declarative.extractors.record_filter import ( 20 ClientSideIncrementalRecordFilterDecorator, 21) 22from airbyte_cdk.sources.declarative.incremental import ( 23 ConcurrentPerPartitionCursor, 24 GlobalSubstreamCursor, 25) 26from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor 27from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import ( 28 PerPartitionWithGlobalCursor, 29) 30from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource 31from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 32 ConcurrencyLevel as ConcurrencyLevelModel, 33) 34from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 35 DatetimeBasedCursor as DatetimeBasedCursorModel, 36) 37from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 38 IncrementingCountCursor as IncrementingCountCursorModel, 39) 40from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( 41 ModelToComponentFactory, 42) 43from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter 44from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever 45from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( 46 DeclarativePartitionFactory, 47 StreamSlicerPartitionGenerator, 48) 49from airbyte_cdk.sources.declarative.types import ConnectionDefinition 50from airbyte_cdk.sources.source import TState 51from airbyte_cdk.sources.streams import Stream 52from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 53from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade 54from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( 55 AlwaysAvailableAvailabilityStrategy, 56) 57from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor 58from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream 59from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream 60 61 62class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]): 63 # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock 64 # because it has hit the limit of futures but not partition reader is consuming them. 65 _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 66 67 def __init__( 68 self, 69 catalog: Optional[ConfiguredAirbyteCatalog], 70 config: Optional[Mapping[str, Any]], 71 state: TState, 72 source_config: ConnectionDefinition, 73 debug: bool = False, 74 emit_connector_builder_messages: bool = False, 75 component_factory: Optional[ModelToComponentFactory] = None, 76 **kwargs: Any, 77 ) -> None: 78 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 79 # no longer needs to store the original incoming state. But maybe there's an edge case? 80 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 81 82 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 83 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 84 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 85 # incremental streams running in full refresh. 86 component_factory = component_factory or ModelToComponentFactory( 87 emit_connector_builder_messages=emit_connector_builder_messages, 88 disable_resumable_full_refresh=True, 89 connector_state_manager=self._connector_state_manager, 90 ) 91 92 super().__init__( 93 source_config=source_config, 94 config=config, 95 debug=debug, 96 emit_connector_builder_messages=emit_connector_builder_messages, 97 component_factory=component_factory, 98 ) 99 100 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 101 if concurrency_level_from_manifest: 102 concurrency_level_component = self._constructor.create_component( 103 model_type=ConcurrencyLevelModel, 104 component_definition=concurrency_level_from_manifest, 105 config=config or {}, 106 ) 107 if not isinstance(concurrency_level_component, ConcurrencyLevel): 108 raise ValueError( 109 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 110 ) 111 112 concurrency_level = concurrency_level_component.get_concurrency_level() 113 initial_number_of_partitions_to_generate = max( 114 concurrency_level // 2, 1 115 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 116 else: 117 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 118 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 119 120 self._concurrent_source = ConcurrentSource.create( 121 num_workers=concurrency_level, 122 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 123 logger=self.logger, 124 slice_logger=self._slice_logger, 125 message_repository=self.message_repository, 126 ) 127 128 # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state. 129 @property 130 def is_partially_declarative(self) -> bool: 131 """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.""" 132 return False 133 134 def read( 135 self, 136 logger: logging.Logger, 137 config: Mapping[str, Any], 138 catalog: ConfiguredAirbyteCatalog, 139 state: Optional[List[AirbyteStateMessage]] = None, 140 ) -> Iterator[AirbyteMessage]: 141 concurrent_streams, _ = self._group_streams(config=config) 142 143 # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of 144 # the concurrent streams must be saved so that they can be removed from the catalog before starting 145 # synchronous streams 146 if len(concurrent_streams) > 0: 147 concurrent_stream_names = set( 148 [concurrent_stream.name for concurrent_stream in concurrent_streams] 149 ) 150 151 selected_concurrent_streams = self._select_streams( 152 streams=concurrent_streams, configured_catalog=catalog 153 ) 154 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 155 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 156 if selected_concurrent_streams: 157 yield from self._concurrent_source.read(selected_concurrent_streams) 158 159 # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the 160 # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many 161 # of which were already synced using the Concurrent CDK 162 filtered_catalog = self._remove_concurrent_streams_from_catalog( 163 catalog=catalog, concurrent_stream_names=concurrent_stream_names 164 ) 165 else: 166 filtered_catalog = catalog 167 168 # It is no need run read for synchronous streams if they are not exists. 169 if not filtered_catalog.streams: 170 return 171 172 yield from super().read(logger, config, filtered_catalog, state) 173 174 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 175 concurrent_streams, synchronous_streams = self._group_streams(config=config) 176 return AirbyteCatalog( 177 streams=[ 178 stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams 179 ] 180 ) 181 182 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 183 """ 184 The `streams` method is used as part of the AbstractSource in the following cases: 185 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 186 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 187 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 188 189 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 190 """ 191 return super().streams(config) 192 193 def _group_streams( 194 self, config: Mapping[str, Any] 195 ) -> Tuple[List[AbstractStream], List[Stream]]: 196 concurrent_streams: List[AbstractStream] = [] 197 synchronous_streams: List[Stream] = [] 198 199 # Combine streams and dynamic_streams. Note: both cannot be empty at the same time, 200 # and this is validated during the initialization of the source. 201 streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs( 202 self._source_config, config 203 ) 204 205 name_to_stream_mapping = {stream["name"]: stream for stream in streams} 206 207 for declarative_stream in self.streams(config=config): 208 # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect 209 # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, 210 # so we need to treat them as synchronous 211 212 if ( 213 isinstance(declarative_stream, DeclarativeStream) 214 and name_to_stream_mapping[declarative_stream.name]["type"] 215 == "StateDelegatingStream" 216 ): 217 stream_state = self._connector_state_manager.get_stream_state( 218 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 219 ) 220 221 name_to_stream_mapping[declarative_stream.name] = ( 222 name_to_stream_mapping[declarative_stream.name]["incremental_stream"] 223 if stream_state 224 else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"] 225 ) 226 227 if isinstance(declarative_stream, DeclarativeStream) and ( 228 name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 229 == "SimpleRetriever" 230 or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 231 == "AsyncRetriever" 232 ): 233 incremental_sync_component_definition = name_to_stream_mapping[ 234 declarative_stream.name 235 ].get("incremental_sync") 236 237 partition_router_component_definition = ( 238 name_to_stream_mapping[declarative_stream.name] 239 .get("retriever", {}) 240 .get("partition_router") 241 ) 242 is_without_partition_router_or_cursor = not bool( 243 incremental_sync_component_definition 244 ) and not bool(partition_router_component_definition) 245 246 is_substream_without_incremental = ( 247 partition_router_component_definition 248 and not incremental_sync_component_definition 249 ) 250 251 if self._is_concurrent_cursor_incremental_without_partition_routing( 252 declarative_stream, incremental_sync_component_definition 253 ): 254 stream_state = self._connector_state_manager.get_stream_state( 255 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 256 ) 257 stream_state = self._migrate_state(declarative_stream, stream_state) 258 259 retriever = self._get_retriever(declarative_stream, stream_state) 260 261 if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance( 262 declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter 263 ): 264 cursor = declarative_stream.retriever.stream_slicer.stream_slicer 265 266 if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor): 267 # This should never happen since we instantiate ConcurrentCursor in 268 # model_to_component_factory.py 269 raise ValueError( 270 f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}" 271 ) 272 273 partition_generator = StreamSlicerPartitionGenerator( 274 partition_factory=DeclarativePartitionFactory( 275 declarative_stream.name, 276 declarative_stream.get_json_schema(), 277 retriever, 278 self.message_repository, 279 ), 280 stream_slicer=declarative_stream.retriever.stream_slicer, 281 ) 282 else: 283 if ( 284 incremental_sync_component_definition 285 and incremental_sync_component_definition.get("type") 286 == IncrementingCountCursorModel.__name__ 287 ): 288 cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor( 289 model_type=IncrementingCountCursorModel, 290 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 291 stream_name=declarative_stream.name, 292 stream_namespace=declarative_stream.namespace, 293 config=config or {}, 294 ) 295 else: 296 cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( 297 model_type=DatetimeBasedCursorModel, 298 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 299 stream_name=declarative_stream.name, 300 stream_namespace=declarative_stream.namespace, 301 config=config or {}, 302 ) 303 partition_generator = StreamSlicerPartitionGenerator( 304 partition_factory=DeclarativePartitionFactory( 305 declarative_stream.name, 306 declarative_stream.get_json_schema(), 307 retriever, 308 self.message_repository, 309 ), 310 stream_slicer=cursor, 311 ) 312 313 concurrent_streams.append( 314 DefaultStream( 315 partition_generator=partition_generator, 316 name=declarative_stream.name, 317 json_schema=declarative_stream.get_json_schema(), 318 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 319 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 320 cursor_field=cursor.cursor_field.cursor_field_key 321 if hasattr(cursor, "cursor_field") 322 and hasattr( 323 cursor.cursor_field, "cursor_field_key" 324 ) # FIXME this will need to be updated once we do the per partition 325 else None, 326 logger=self.logger, 327 cursor=cursor, 328 ) 329 ) 330 elif ( 331 is_substream_without_incremental or is_without_partition_router_or_cursor 332 ) and hasattr(declarative_stream.retriever, "stream_slicer"): 333 partition_generator = StreamSlicerPartitionGenerator( 334 DeclarativePartitionFactory( 335 declarative_stream.name, 336 declarative_stream.get_json_schema(), 337 declarative_stream.retriever, 338 self.message_repository, 339 ), 340 declarative_stream.retriever.stream_slicer, 341 ) 342 343 final_state_cursor = FinalStateCursor( 344 stream_name=declarative_stream.name, 345 stream_namespace=declarative_stream.namespace, 346 message_repository=self.message_repository, 347 ) 348 349 concurrent_streams.append( 350 DefaultStream( 351 partition_generator=partition_generator, 352 name=declarative_stream.name, 353 json_schema=declarative_stream.get_json_schema(), 354 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 355 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 356 cursor_field=None, 357 logger=self.logger, 358 cursor=final_state_cursor, 359 ) 360 ) 361 elif ( 362 incremental_sync_component_definition 363 and incremental_sync_component_definition.get("type", "") 364 == DatetimeBasedCursorModel.__name__ 365 and hasattr(declarative_stream.retriever, "stream_slicer") 366 and isinstance( 367 declarative_stream.retriever.stream_slicer, 368 (GlobalSubstreamCursor, PerPartitionWithGlobalCursor), 369 ) 370 ): 371 stream_state = self._connector_state_manager.get_stream_state( 372 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 373 ) 374 stream_state = self._migrate_state(declarative_stream, stream_state) 375 376 partition_router = declarative_stream.retriever.stream_slicer._partition_router 377 378 perpartition_cursor = ( 379 self._constructor.create_concurrent_cursor_from_perpartition_cursor( 380 state_manager=self._connector_state_manager, 381 model_type=DatetimeBasedCursorModel, 382 component_definition=incremental_sync_component_definition, 383 stream_name=declarative_stream.name, 384 stream_namespace=declarative_stream.namespace, 385 config=config or {}, 386 stream_state=stream_state, 387 partition_router=partition_router, 388 ) 389 ) 390 391 retriever = self._get_retriever(declarative_stream, stream_state) 392 393 partition_generator = StreamSlicerPartitionGenerator( 394 DeclarativePartitionFactory( 395 declarative_stream.name, 396 declarative_stream.get_json_schema(), 397 retriever, 398 self.message_repository, 399 ), 400 perpartition_cursor, 401 ) 402 403 concurrent_streams.append( 404 DefaultStream( 405 partition_generator=partition_generator, 406 name=declarative_stream.name, 407 json_schema=declarative_stream.get_json_schema(), 408 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 409 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 410 cursor_field=perpartition_cursor.cursor_field.cursor_field_key, 411 logger=self.logger, 412 cursor=perpartition_cursor, 413 ) 414 ) 415 else: 416 synchronous_streams.append(declarative_stream) 417 # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state. 418 # Condition below needs to ensure that concurrent support is not lost for sources that already support 419 # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe). 420 elif ( 421 isinstance(declarative_stream, AbstractStreamFacade) 422 and self.is_partially_declarative 423 ): 424 concurrent_streams.append(declarative_stream.get_underlying_stream()) 425 else: 426 synchronous_streams.append(declarative_stream) 427 428 return concurrent_streams, synchronous_streams 429 430 def _is_concurrent_cursor_incremental_without_partition_routing( 431 self, 432 declarative_stream: DeclarativeStream, 433 incremental_sync_component_definition: Mapping[str, Any] | None, 434 ) -> bool: 435 return ( 436 incremental_sync_component_definition is not None 437 and bool(incremental_sync_component_definition) 438 and ( 439 incremental_sync_component_definition.get("type", "") 440 in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__) 441 ) 442 and hasattr(declarative_stream.retriever, "stream_slicer") 443 and ( 444 isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) 445 # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor 446 # add isintance check here if we want to create a Declarative IncrementingCountCursor 447 # or isinstance( 448 # declarative_stream.retriever.stream_slicer, IncrementingCountCursor 449 # ) 450 or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter) 451 ) 452 ) 453 454 @staticmethod 455 def _get_retriever( 456 declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any] 457 ) -> Retriever: 458 retriever = declarative_stream.retriever 459 460 # This is an optimization so that we don't invoke any cursor or state management flows within the 461 # low-code framework because state management is handled through the ConcurrentCursor. 462 if declarative_stream and isinstance(retriever, SimpleRetriever): 463 # Also a temporary hack. In the legacy Stream implementation, as part of the read, 464 # set_initial_state() is called to instantiate incoming state on the cursor. Although we no 465 # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components 466 # like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is 467 # properly initialized with state. 468 if retriever.cursor: 469 retriever.cursor.set_initial_state(stream_state=stream_state) 470 471 # Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance 472 # from the one initialized on the SimpleRetriever, so it also must also have state initialized 473 # for semi-incremental streams using is_client_side_incremental to filter properly 474 if isinstance(retriever.record_selector, RecordSelector) and isinstance( 475 retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator 476 ): 477 retriever.record_selector.record_filter._cursor.set_initial_state( 478 stream_state=stream_state 479 ) # type: ignore # After non-concurrent cursors are deprecated we can remove these cursor workarounds 480 481 # We zero it out here, but since this is a cursor reference, the state is still properly 482 # instantiated for the other components that reference it 483 retriever.cursor = None 484 485 return retriever 486 487 @staticmethod 488 def _select_streams( 489 streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog 490 ) -> List[AbstractStream]: 491 stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} 492 abstract_streams: List[AbstractStream] = [] 493 for configured_stream in configured_catalog.streams: 494 stream_instance = stream_name_to_instance.get(configured_stream.stream.name) 495 if stream_instance: 496 abstract_streams.append(stream_instance) 497 498 return abstract_streams 499 500 @staticmethod 501 def _remove_concurrent_streams_from_catalog( 502 catalog: ConfiguredAirbyteCatalog, 503 concurrent_stream_names: set[str], 504 ) -> ConfiguredAirbyteCatalog: 505 return ConfiguredAirbyteCatalog( 506 streams=[ 507 stream 508 for stream in catalog.streams 509 if stream.stream.name not in concurrent_stream_names 510 ] 511 ) 512 513 @staticmethod 514 def _migrate_state( 515 declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any] 516 ) -> MutableMapping[str, Any]: 517 for state_migration in declarative_stream.state_migrations: 518 if state_migration.should_migrate(stream_state): 519 # The state variable is expected to be mutable but the migrate method returns an immutable mapping. 520 stream_state = dict(state_migration.migrate(stream_state)) 521 522 return stream_state
class
ConcurrentDeclarativeSource(airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource, typing.Generic[~TState]):
63class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]): 64 # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock 65 # because it has hit the limit of futures but not partition reader is consuming them. 66 _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 67 68 def __init__( 69 self, 70 catalog: Optional[ConfiguredAirbyteCatalog], 71 config: Optional[Mapping[str, Any]], 72 state: TState, 73 source_config: ConnectionDefinition, 74 debug: bool = False, 75 emit_connector_builder_messages: bool = False, 76 component_factory: Optional[ModelToComponentFactory] = None, 77 **kwargs: Any, 78 ) -> None: 79 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 80 # no longer needs to store the original incoming state. But maybe there's an edge case? 81 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 82 83 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 84 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 85 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 86 # incremental streams running in full refresh. 87 component_factory = component_factory or ModelToComponentFactory( 88 emit_connector_builder_messages=emit_connector_builder_messages, 89 disable_resumable_full_refresh=True, 90 connector_state_manager=self._connector_state_manager, 91 ) 92 93 super().__init__( 94 source_config=source_config, 95 config=config, 96 debug=debug, 97 emit_connector_builder_messages=emit_connector_builder_messages, 98 component_factory=component_factory, 99 ) 100 101 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 102 if concurrency_level_from_manifest: 103 concurrency_level_component = self._constructor.create_component( 104 model_type=ConcurrencyLevelModel, 105 component_definition=concurrency_level_from_manifest, 106 config=config or {}, 107 ) 108 if not isinstance(concurrency_level_component, ConcurrencyLevel): 109 raise ValueError( 110 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 111 ) 112 113 concurrency_level = concurrency_level_component.get_concurrency_level() 114 initial_number_of_partitions_to_generate = max( 115 concurrency_level // 2, 1 116 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 117 else: 118 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 119 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 120 121 self._concurrent_source = ConcurrentSource.create( 122 num_workers=concurrency_level, 123 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 124 logger=self.logger, 125 slice_logger=self._slice_logger, 126 message_repository=self.message_repository, 127 ) 128 129 # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state. 130 @property 131 def is_partially_declarative(self) -> bool: 132 """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.""" 133 return False 134 135 def read( 136 self, 137 logger: logging.Logger, 138 config: Mapping[str, Any], 139 catalog: ConfiguredAirbyteCatalog, 140 state: Optional[List[AirbyteStateMessage]] = None, 141 ) -> Iterator[AirbyteMessage]: 142 concurrent_streams, _ = self._group_streams(config=config) 143 144 # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of 145 # the concurrent streams must be saved so that they can be removed from the catalog before starting 146 # synchronous streams 147 if len(concurrent_streams) > 0: 148 concurrent_stream_names = set( 149 [concurrent_stream.name for concurrent_stream in concurrent_streams] 150 ) 151 152 selected_concurrent_streams = self._select_streams( 153 streams=concurrent_streams, configured_catalog=catalog 154 ) 155 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 156 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 157 if selected_concurrent_streams: 158 yield from self._concurrent_source.read(selected_concurrent_streams) 159 160 # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the 161 # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many 162 # of which were already synced using the Concurrent CDK 163 filtered_catalog = self._remove_concurrent_streams_from_catalog( 164 catalog=catalog, concurrent_stream_names=concurrent_stream_names 165 ) 166 else: 167 filtered_catalog = catalog 168 169 # It is no need run read for synchronous streams if they are not exists. 170 if not filtered_catalog.streams: 171 return 172 173 yield from super().read(logger, config, filtered_catalog, state) 174 175 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 176 concurrent_streams, synchronous_streams = self._group_streams(config=config) 177 return AirbyteCatalog( 178 streams=[ 179 stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams 180 ] 181 ) 182 183 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 184 """ 185 The `streams` method is used as part of the AbstractSource in the following cases: 186 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 187 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 188 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 189 190 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 191 """ 192 return super().streams(config) 193 194 def _group_streams( 195 self, config: Mapping[str, Any] 196 ) -> Tuple[List[AbstractStream], List[Stream]]: 197 concurrent_streams: List[AbstractStream] = [] 198 synchronous_streams: List[Stream] = [] 199 200 # Combine streams and dynamic_streams. Note: both cannot be empty at the same time, 201 # and this is validated during the initialization of the source. 202 streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs( 203 self._source_config, config 204 ) 205 206 name_to_stream_mapping = {stream["name"]: stream for stream in streams} 207 208 for declarative_stream in self.streams(config=config): 209 # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect 210 # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, 211 # so we need to treat them as synchronous 212 213 if ( 214 isinstance(declarative_stream, DeclarativeStream) 215 and name_to_stream_mapping[declarative_stream.name]["type"] 216 == "StateDelegatingStream" 217 ): 218 stream_state = self._connector_state_manager.get_stream_state( 219 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 220 ) 221 222 name_to_stream_mapping[declarative_stream.name] = ( 223 name_to_stream_mapping[declarative_stream.name]["incremental_stream"] 224 if stream_state 225 else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"] 226 ) 227 228 if isinstance(declarative_stream, DeclarativeStream) and ( 229 name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 230 == "SimpleRetriever" 231 or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 232 == "AsyncRetriever" 233 ): 234 incremental_sync_component_definition = name_to_stream_mapping[ 235 declarative_stream.name 236 ].get("incremental_sync") 237 238 partition_router_component_definition = ( 239 name_to_stream_mapping[declarative_stream.name] 240 .get("retriever", {}) 241 .get("partition_router") 242 ) 243 is_without_partition_router_or_cursor = not bool( 244 incremental_sync_component_definition 245 ) and not bool(partition_router_component_definition) 246 247 is_substream_without_incremental = ( 248 partition_router_component_definition 249 and not incremental_sync_component_definition 250 ) 251 252 if self._is_concurrent_cursor_incremental_without_partition_routing( 253 declarative_stream, incremental_sync_component_definition 254 ): 255 stream_state = self._connector_state_manager.get_stream_state( 256 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 257 ) 258 stream_state = self._migrate_state(declarative_stream, stream_state) 259 260 retriever = self._get_retriever(declarative_stream, stream_state) 261 262 if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance( 263 declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter 264 ): 265 cursor = declarative_stream.retriever.stream_slicer.stream_slicer 266 267 if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor): 268 # This should never happen since we instantiate ConcurrentCursor in 269 # model_to_component_factory.py 270 raise ValueError( 271 f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}" 272 ) 273 274 partition_generator = StreamSlicerPartitionGenerator( 275 partition_factory=DeclarativePartitionFactory( 276 declarative_stream.name, 277 declarative_stream.get_json_schema(), 278 retriever, 279 self.message_repository, 280 ), 281 stream_slicer=declarative_stream.retriever.stream_slicer, 282 ) 283 else: 284 if ( 285 incremental_sync_component_definition 286 and incremental_sync_component_definition.get("type") 287 == IncrementingCountCursorModel.__name__ 288 ): 289 cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor( 290 model_type=IncrementingCountCursorModel, 291 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 292 stream_name=declarative_stream.name, 293 stream_namespace=declarative_stream.namespace, 294 config=config or {}, 295 ) 296 else: 297 cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( 298 model_type=DatetimeBasedCursorModel, 299 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 300 stream_name=declarative_stream.name, 301 stream_namespace=declarative_stream.namespace, 302 config=config or {}, 303 ) 304 partition_generator = StreamSlicerPartitionGenerator( 305 partition_factory=DeclarativePartitionFactory( 306 declarative_stream.name, 307 declarative_stream.get_json_schema(), 308 retriever, 309 self.message_repository, 310 ), 311 stream_slicer=cursor, 312 ) 313 314 concurrent_streams.append( 315 DefaultStream( 316 partition_generator=partition_generator, 317 name=declarative_stream.name, 318 json_schema=declarative_stream.get_json_schema(), 319 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 320 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 321 cursor_field=cursor.cursor_field.cursor_field_key 322 if hasattr(cursor, "cursor_field") 323 and hasattr( 324 cursor.cursor_field, "cursor_field_key" 325 ) # FIXME this will need to be updated once we do the per partition 326 else None, 327 logger=self.logger, 328 cursor=cursor, 329 ) 330 ) 331 elif ( 332 is_substream_without_incremental or is_without_partition_router_or_cursor 333 ) and hasattr(declarative_stream.retriever, "stream_slicer"): 334 partition_generator = StreamSlicerPartitionGenerator( 335 DeclarativePartitionFactory( 336 declarative_stream.name, 337 declarative_stream.get_json_schema(), 338 declarative_stream.retriever, 339 self.message_repository, 340 ), 341 declarative_stream.retriever.stream_slicer, 342 ) 343 344 final_state_cursor = FinalStateCursor( 345 stream_name=declarative_stream.name, 346 stream_namespace=declarative_stream.namespace, 347 message_repository=self.message_repository, 348 ) 349 350 concurrent_streams.append( 351 DefaultStream( 352 partition_generator=partition_generator, 353 name=declarative_stream.name, 354 json_schema=declarative_stream.get_json_schema(), 355 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 356 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 357 cursor_field=None, 358 logger=self.logger, 359 cursor=final_state_cursor, 360 ) 361 ) 362 elif ( 363 incremental_sync_component_definition 364 and incremental_sync_component_definition.get("type", "") 365 == DatetimeBasedCursorModel.__name__ 366 and hasattr(declarative_stream.retriever, "stream_slicer") 367 and isinstance( 368 declarative_stream.retriever.stream_slicer, 369 (GlobalSubstreamCursor, PerPartitionWithGlobalCursor), 370 ) 371 ): 372 stream_state = self._connector_state_manager.get_stream_state( 373 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 374 ) 375 stream_state = self._migrate_state(declarative_stream, stream_state) 376 377 partition_router = declarative_stream.retriever.stream_slicer._partition_router 378 379 perpartition_cursor = ( 380 self._constructor.create_concurrent_cursor_from_perpartition_cursor( 381 state_manager=self._connector_state_manager, 382 model_type=DatetimeBasedCursorModel, 383 component_definition=incremental_sync_component_definition, 384 stream_name=declarative_stream.name, 385 stream_namespace=declarative_stream.namespace, 386 config=config or {}, 387 stream_state=stream_state, 388 partition_router=partition_router, 389 ) 390 ) 391 392 retriever = self._get_retriever(declarative_stream, stream_state) 393 394 partition_generator = StreamSlicerPartitionGenerator( 395 DeclarativePartitionFactory( 396 declarative_stream.name, 397 declarative_stream.get_json_schema(), 398 retriever, 399 self.message_repository, 400 ), 401 perpartition_cursor, 402 ) 403 404 concurrent_streams.append( 405 DefaultStream( 406 partition_generator=partition_generator, 407 name=declarative_stream.name, 408 json_schema=declarative_stream.get_json_schema(), 409 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 410 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 411 cursor_field=perpartition_cursor.cursor_field.cursor_field_key, 412 logger=self.logger, 413 cursor=perpartition_cursor, 414 ) 415 ) 416 else: 417 synchronous_streams.append(declarative_stream) 418 # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state. 419 # Condition below needs to ensure that concurrent support is not lost for sources that already support 420 # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe). 421 elif ( 422 isinstance(declarative_stream, AbstractStreamFacade) 423 and self.is_partially_declarative 424 ): 425 concurrent_streams.append(declarative_stream.get_underlying_stream()) 426 else: 427 synchronous_streams.append(declarative_stream) 428 429 return concurrent_streams, synchronous_streams 430 431 def _is_concurrent_cursor_incremental_without_partition_routing( 432 self, 433 declarative_stream: DeclarativeStream, 434 incremental_sync_component_definition: Mapping[str, Any] | None, 435 ) -> bool: 436 return ( 437 incremental_sync_component_definition is not None 438 and bool(incremental_sync_component_definition) 439 and ( 440 incremental_sync_component_definition.get("type", "") 441 in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__) 442 ) 443 and hasattr(declarative_stream.retriever, "stream_slicer") 444 and ( 445 isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) 446 # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor 447 # add isintance check here if we want to create a Declarative IncrementingCountCursor 448 # or isinstance( 449 # declarative_stream.retriever.stream_slicer, IncrementingCountCursor 450 # ) 451 or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter) 452 ) 453 ) 454 455 @staticmethod 456 def _get_retriever( 457 declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any] 458 ) -> Retriever: 459 retriever = declarative_stream.retriever 460 461 # This is an optimization so that we don't invoke any cursor or state management flows within the 462 # low-code framework because state management is handled through the ConcurrentCursor. 463 if declarative_stream and isinstance(retriever, SimpleRetriever): 464 # Also a temporary hack. In the legacy Stream implementation, as part of the read, 465 # set_initial_state() is called to instantiate incoming state on the cursor. Although we no 466 # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components 467 # like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is 468 # properly initialized with state. 469 if retriever.cursor: 470 retriever.cursor.set_initial_state(stream_state=stream_state) 471 472 # Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance 473 # from the one initialized on the SimpleRetriever, so it also must also have state initialized 474 # for semi-incremental streams using is_client_side_incremental to filter properly 475 if isinstance(retriever.record_selector, RecordSelector) and isinstance( 476 retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator 477 ): 478 retriever.record_selector.record_filter._cursor.set_initial_state( 479 stream_state=stream_state 480 ) # type: ignore # After non-concurrent cursors are deprecated we can remove these cursor workarounds 481 482 # We zero it out here, but since this is a cursor reference, the state is still properly 483 # instantiated for the other components that reference it 484 retriever.cursor = None 485 486 return retriever 487 488 @staticmethod 489 def _select_streams( 490 streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog 491 ) -> List[AbstractStream]: 492 stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} 493 abstract_streams: List[AbstractStream] = [] 494 for configured_stream in configured_catalog.streams: 495 stream_instance = stream_name_to_instance.get(configured_stream.stream.name) 496 if stream_instance: 497 abstract_streams.append(stream_instance) 498 499 return abstract_streams 500 501 @staticmethod 502 def _remove_concurrent_streams_from_catalog( 503 catalog: ConfiguredAirbyteCatalog, 504 concurrent_stream_names: set[str], 505 ) -> ConfiguredAirbyteCatalog: 506 return ConfiguredAirbyteCatalog( 507 streams=[ 508 stream 509 for stream in catalog.streams 510 if stream.stream.name not in concurrent_stream_names 511 ] 512 ) 513 514 @staticmethod 515 def _migrate_state( 516 declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any] 517 ) -> MutableMapping[str, Any]: 518 for state_migration in declarative_stream.state_migrations: 519 if state_migration.should_migrate(stream_state): 520 # The state variable is expected to be mutable but the migrate method returns an immutable mapping. 521 stream_state = dict(state_migration.migrate(stream_state)) 522 523 return stream_state
Declarative source defined by a manifest of low-code components that define source connector behavior
is_partially_declarative: bool
130 @property 131 def is_partially_declarative(self) -> bool: 132 """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.""" 133 return False
This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.
def
discover( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteCatalog:
175 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 176 concurrent_streams, synchronous_streams = self._group_streams(config=config) 177 return AirbyteCatalog( 178 streams=[ 179 stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams 180 ] 181 )
Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.