airbyte_cdk.sources.declarative.concurrent_declarative_source
1# 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 3# 4 5import logging 6from typing import Any, Generic, Iterator, List, Mapping, MutableMapping, Optional, Tuple 7 8from airbyte_cdk.models import ( 9 AirbyteCatalog, 10 AirbyteMessage, 11 AirbyteStateMessage, 12 ConfiguredAirbyteCatalog, 13) 14from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource 15from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 16from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel 17from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream 18from airbyte_cdk.sources.declarative.extractors import RecordSelector 19from airbyte_cdk.sources.declarative.extractors.record_filter import ( 20 ClientSideIncrementalRecordFilterDecorator, 21) 22from airbyte_cdk.sources.declarative.incremental import ConcurrentPerPartitionCursor 23from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor 24from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import ( 25 PerPartitionWithGlobalCursor, 26) 27from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource 28from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 29 ConcurrencyLevel as ConcurrencyLevelModel, 30) 31from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 32 DatetimeBasedCursor as DatetimeBasedCursorModel, 33) 34from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 35 IncrementingCountCursor as IncrementingCountCursorModel, 36) 37from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( 38 ModelToComponentFactory, 39) 40from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter 41from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever 42from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( 43 DeclarativePartitionFactory, 44 StreamSlicerPartitionGenerator, 45) 46from airbyte_cdk.sources.declarative.types import ConnectionDefinition 47from airbyte_cdk.sources.source import TState 48from airbyte_cdk.sources.streams import Stream 49from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 50from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade 51from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( 52 AlwaysAvailableAvailabilityStrategy, 53) 54from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor 55from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream 56from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream 57 58 59class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]): 60 # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock 61 # because it has hit the limit of futures but not partition reader is consuming them. 62 _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 63 64 def __init__( 65 self, 66 catalog: Optional[ConfiguredAirbyteCatalog], 67 config: Optional[Mapping[str, Any]], 68 state: TState, 69 source_config: ConnectionDefinition, 70 debug: bool = False, 71 emit_connector_builder_messages: bool = False, 72 component_factory: Optional[ModelToComponentFactory] = None, 73 **kwargs: Any, 74 ) -> None: 75 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 76 # no longer needs to store the original incoming state. But maybe there's an edge case? 77 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 78 79 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 80 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 81 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 82 # incremental streams running in full refresh. 83 component_factory = component_factory or ModelToComponentFactory( 84 emit_connector_builder_messages=emit_connector_builder_messages, 85 disable_resumable_full_refresh=True, 86 connector_state_manager=self._connector_state_manager, 87 ) 88 89 super().__init__( 90 source_config=source_config, 91 config=config, 92 debug=debug, 93 emit_connector_builder_messages=emit_connector_builder_messages, 94 component_factory=component_factory, 95 ) 96 97 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 98 if concurrency_level_from_manifest: 99 concurrency_level_component = self._constructor.create_component( 100 model_type=ConcurrencyLevelModel, 101 component_definition=concurrency_level_from_manifest, 102 config=config or {}, 103 ) 104 if not isinstance(concurrency_level_component, ConcurrencyLevel): 105 raise ValueError( 106 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 107 ) 108 109 concurrency_level = concurrency_level_component.get_concurrency_level() 110 initial_number_of_partitions_to_generate = max( 111 concurrency_level // 2, 1 112 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 113 else: 114 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 115 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 116 117 self._concurrent_source = ConcurrentSource.create( 118 num_workers=concurrency_level, 119 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 120 logger=self.logger, 121 slice_logger=self._slice_logger, 122 message_repository=self.message_repository, 123 ) 124 125 # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state. 126 @property 127 def is_partially_declarative(self) -> bool: 128 """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.""" 129 return False 130 131 def read( 132 self, 133 logger: logging.Logger, 134 config: Mapping[str, Any], 135 catalog: ConfiguredAirbyteCatalog, 136 state: Optional[List[AirbyteStateMessage]] = None, 137 ) -> Iterator[AirbyteMessage]: 138 concurrent_streams, _ = self._group_streams(config=config) 139 140 # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of 141 # the concurrent streams must be saved so that they can be removed from the catalog before starting 142 # synchronous streams 143 if len(concurrent_streams) > 0: 144 concurrent_stream_names = set( 145 [concurrent_stream.name for concurrent_stream in concurrent_streams] 146 ) 147 148 selected_concurrent_streams = self._select_streams( 149 streams=concurrent_streams, configured_catalog=catalog 150 ) 151 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 152 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 153 if selected_concurrent_streams: 154 yield from self._concurrent_source.read(selected_concurrent_streams) 155 156 # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the 157 # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many 158 # of which were already synced using the Concurrent CDK 159 filtered_catalog = self._remove_concurrent_streams_from_catalog( 160 catalog=catalog, concurrent_stream_names=concurrent_stream_names 161 ) 162 else: 163 filtered_catalog = catalog 164 165 # It is no need run read for synchronous streams if they are not exists. 166 if not filtered_catalog.streams: 167 return 168 169 yield from super().read(logger, config, filtered_catalog, state) 170 171 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 172 concurrent_streams, synchronous_streams = self._group_streams(config=config) 173 return AirbyteCatalog( 174 streams=[ 175 stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams 176 ] 177 ) 178 179 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 180 """ 181 The `streams` method is used as part of the AbstractSource in the following cases: 182 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 183 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 184 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 185 186 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 187 """ 188 return super().streams(config) 189 190 def _group_streams( 191 self, config: Mapping[str, Any] 192 ) -> Tuple[List[AbstractStream], List[Stream]]: 193 concurrent_streams: List[AbstractStream] = [] 194 synchronous_streams: List[Stream] = [] 195 196 # Combine streams and dynamic_streams. Note: both cannot be empty at the same time, 197 # and this is validated during the initialization of the source. 198 streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs( 199 self._source_config, config 200 ) 201 202 name_to_stream_mapping = {stream["name"]: stream for stream in streams} 203 204 for declarative_stream in self.streams(config=config): 205 # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect 206 # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, 207 # so we need to treat them as synchronous 208 209 if ( 210 isinstance(declarative_stream, DeclarativeStream) 211 and name_to_stream_mapping[declarative_stream.name]["type"] 212 == "StateDelegatingStream" 213 ): 214 stream_state = self._connector_state_manager.get_stream_state( 215 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 216 ) 217 218 name_to_stream_mapping[declarative_stream.name] = ( 219 name_to_stream_mapping[declarative_stream.name]["incremental_stream"] 220 if stream_state 221 else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"] 222 ) 223 224 if isinstance(declarative_stream, DeclarativeStream) and ( 225 name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 226 == "SimpleRetriever" 227 or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 228 == "AsyncRetriever" 229 ): 230 incremental_sync_component_definition = name_to_stream_mapping[ 231 declarative_stream.name 232 ].get("incremental_sync") 233 234 partition_router_component_definition = ( 235 name_to_stream_mapping[declarative_stream.name] 236 .get("retriever", {}) 237 .get("partition_router") 238 ) 239 is_without_partition_router_or_cursor = not bool( 240 incremental_sync_component_definition 241 ) and not bool(partition_router_component_definition) 242 243 is_substream_without_incremental = ( 244 partition_router_component_definition 245 and not incremental_sync_component_definition 246 ) 247 248 if self._is_concurrent_cursor_incremental_without_partition_routing( 249 declarative_stream, incremental_sync_component_definition 250 ): 251 stream_state = self._connector_state_manager.get_stream_state( 252 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 253 ) 254 stream_state = self._migrate_state(declarative_stream, stream_state) 255 256 retriever = self._get_retriever(declarative_stream, stream_state) 257 258 if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance( 259 declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter 260 ): 261 cursor = declarative_stream.retriever.stream_slicer.stream_slicer 262 263 if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor): 264 # This should never happen since we instantiate ConcurrentCursor in 265 # model_to_component_factory.py 266 raise ValueError( 267 f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}" 268 ) 269 270 partition_generator = StreamSlicerPartitionGenerator( 271 partition_factory=DeclarativePartitionFactory( 272 declarative_stream.name, 273 declarative_stream.get_json_schema(), 274 retriever, 275 self.message_repository, 276 ), 277 stream_slicer=declarative_stream.retriever.stream_slicer, 278 ) 279 else: 280 if ( 281 incremental_sync_component_definition 282 and incremental_sync_component_definition.get("type") 283 == IncrementingCountCursorModel.__name__ 284 ): 285 cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor( 286 model_type=IncrementingCountCursorModel, 287 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 288 stream_name=declarative_stream.name, 289 stream_namespace=declarative_stream.namespace, 290 config=config or {}, 291 ) 292 else: 293 cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( 294 model_type=DatetimeBasedCursorModel, 295 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 296 stream_name=declarative_stream.name, 297 stream_namespace=declarative_stream.namespace, 298 config=config or {}, 299 ) 300 partition_generator = StreamSlicerPartitionGenerator( 301 partition_factory=DeclarativePartitionFactory( 302 declarative_stream.name, 303 declarative_stream.get_json_schema(), 304 retriever, 305 self.message_repository, 306 ), 307 stream_slicer=cursor, 308 ) 309 310 concurrent_streams.append( 311 DefaultStream( 312 partition_generator=partition_generator, 313 name=declarative_stream.name, 314 json_schema=declarative_stream.get_json_schema(), 315 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 316 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 317 cursor_field=cursor.cursor_field.cursor_field_key 318 if hasattr(cursor, "cursor_field") 319 and hasattr( 320 cursor.cursor_field, "cursor_field_key" 321 ) # FIXME this will need to be updated once we do the per partition 322 else None, 323 logger=self.logger, 324 cursor=cursor, 325 ) 326 ) 327 elif ( 328 is_substream_without_incremental or is_without_partition_router_or_cursor 329 ) and hasattr(declarative_stream.retriever, "stream_slicer"): 330 partition_generator = StreamSlicerPartitionGenerator( 331 DeclarativePartitionFactory( 332 declarative_stream.name, 333 declarative_stream.get_json_schema(), 334 declarative_stream.retriever, 335 self.message_repository, 336 ), 337 declarative_stream.retriever.stream_slicer, 338 ) 339 340 final_state_cursor = FinalStateCursor( 341 stream_name=declarative_stream.name, 342 stream_namespace=declarative_stream.namespace, 343 message_repository=self.message_repository, 344 ) 345 346 concurrent_streams.append( 347 DefaultStream( 348 partition_generator=partition_generator, 349 name=declarative_stream.name, 350 json_schema=declarative_stream.get_json_schema(), 351 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 352 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 353 cursor_field=None, 354 logger=self.logger, 355 cursor=final_state_cursor, 356 ) 357 ) 358 elif ( 359 incremental_sync_component_definition 360 and incremental_sync_component_definition.get("type", "") 361 == DatetimeBasedCursorModel.__name__ 362 and hasattr(declarative_stream.retriever, "stream_slicer") 363 and isinstance( 364 declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor 365 ) 366 ): 367 stream_state = self._connector_state_manager.get_stream_state( 368 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 369 ) 370 stream_state = self._migrate_state(declarative_stream, stream_state) 371 372 partition_router = declarative_stream.retriever.stream_slicer._partition_router 373 374 perpartition_cursor = ( 375 self._constructor.create_concurrent_cursor_from_perpartition_cursor( 376 state_manager=self._connector_state_manager, 377 model_type=DatetimeBasedCursorModel, 378 component_definition=incremental_sync_component_definition, 379 stream_name=declarative_stream.name, 380 stream_namespace=declarative_stream.namespace, 381 config=config or {}, 382 stream_state=stream_state, 383 partition_router=partition_router, 384 ) 385 ) 386 387 retriever = self._get_retriever(declarative_stream, stream_state) 388 389 partition_generator = StreamSlicerPartitionGenerator( 390 DeclarativePartitionFactory( 391 declarative_stream.name, 392 declarative_stream.get_json_schema(), 393 retriever, 394 self.message_repository, 395 ), 396 perpartition_cursor, 397 ) 398 399 concurrent_streams.append( 400 DefaultStream( 401 partition_generator=partition_generator, 402 name=declarative_stream.name, 403 json_schema=declarative_stream.get_json_schema(), 404 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 405 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 406 cursor_field=perpartition_cursor.cursor_field.cursor_field_key, 407 logger=self.logger, 408 cursor=perpartition_cursor, 409 ) 410 ) 411 else: 412 synchronous_streams.append(declarative_stream) 413 # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state. 414 # Condition below needs to ensure that concurrent support is not lost for sources that already support 415 # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe). 416 elif ( 417 isinstance(declarative_stream, AbstractStreamFacade) 418 and self.is_partially_declarative 419 ): 420 concurrent_streams.append(declarative_stream.get_underlying_stream()) 421 else: 422 synchronous_streams.append(declarative_stream) 423 424 return concurrent_streams, synchronous_streams 425 426 def _is_concurrent_cursor_incremental_without_partition_routing( 427 self, 428 declarative_stream: DeclarativeStream, 429 incremental_sync_component_definition: Mapping[str, Any] | None, 430 ) -> bool: 431 return ( 432 incremental_sync_component_definition is not None 433 and bool(incremental_sync_component_definition) 434 and ( 435 incremental_sync_component_definition.get("type", "") 436 in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__) 437 ) 438 and hasattr(declarative_stream.retriever, "stream_slicer") 439 and ( 440 isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) 441 # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor 442 # add isintance check here if we want to create a Declarative IncrementingCountCursor 443 # or isinstance( 444 # declarative_stream.retriever.stream_slicer, IncrementingCountCursor 445 # ) 446 or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter) 447 ) 448 ) 449 450 @staticmethod 451 def _get_retriever( 452 declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any] 453 ) -> Retriever: 454 retriever = declarative_stream.retriever 455 456 # This is an optimization so that we don't invoke any cursor or state management flows within the 457 # low-code framework because state management is handled through the ConcurrentCursor. 458 if declarative_stream and isinstance(retriever, SimpleRetriever): 459 # Also a temporary hack. In the legacy Stream implementation, as part of the read, 460 # set_initial_state() is called to instantiate incoming state on the cursor. Although we no 461 # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components 462 # like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is 463 # properly initialized with state. 464 if retriever.cursor: 465 retriever.cursor.set_initial_state(stream_state=stream_state) 466 467 # Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance 468 # from the one initialized on the SimpleRetriever, so it also must also have state initialized 469 # for semi-incremental streams using is_client_side_incremental to filter properly 470 if isinstance(retriever.record_selector, RecordSelector) and isinstance( 471 retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator 472 ): 473 retriever.record_selector.record_filter._cursor.set_initial_state( 474 stream_state=stream_state 475 ) # type: ignore # After non-concurrent cursors are deprecated we can remove these cursor workarounds 476 477 # We zero it out here, but since this is a cursor reference, the state is still properly 478 # instantiated for the other components that reference it 479 retriever.cursor = None 480 481 return retriever 482 483 @staticmethod 484 def _select_streams( 485 streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog 486 ) -> List[AbstractStream]: 487 stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} 488 abstract_streams: List[AbstractStream] = [] 489 for configured_stream in configured_catalog.streams: 490 stream_instance = stream_name_to_instance.get(configured_stream.stream.name) 491 if stream_instance: 492 abstract_streams.append(stream_instance) 493 494 return abstract_streams 495 496 @staticmethod 497 def _remove_concurrent_streams_from_catalog( 498 catalog: ConfiguredAirbyteCatalog, 499 concurrent_stream_names: set[str], 500 ) -> ConfiguredAirbyteCatalog: 501 return ConfiguredAirbyteCatalog( 502 streams=[ 503 stream 504 for stream in catalog.streams 505 if stream.stream.name not in concurrent_stream_names 506 ] 507 ) 508 509 @staticmethod 510 def _migrate_state( 511 declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any] 512 ) -> MutableMapping[str, Any]: 513 for state_migration in declarative_stream.state_migrations: 514 if state_migration.should_migrate(stream_state): 515 # The state variable is expected to be mutable but the migrate method returns an immutable mapping. 516 stream_state = dict(state_migration.migrate(stream_state)) 517 518 return stream_state
class
ConcurrentDeclarativeSource(airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource, typing.Generic[~TState]):
60class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]): 61 # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock 62 # because it has hit the limit of futures but not partition reader is consuming them. 63 _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 64 65 def __init__( 66 self, 67 catalog: Optional[ConfiguredAirbyteCatalog], 68 config: Optional[Mapping[str, Any]], 69 state: TState, 70 source_config: ConnectionDefinition, 71 debug: bool = False, 72 emit_connector_builder_messages: bool = False, 73 component_factory: Optional[ModelToComponentFactory] = None, 74 **kwargs: Any, 75 ) -> None: 76 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 77 # no longer needs to store the original incoming state. But maybe there's an edge case? 78 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 79 80 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 81 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 82 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 83 # incremental streams running in full refresh. 84 component_factory = component_factory or ModelToComponentFactory( 85 emit_connector_builder_messages=emit_connector_builder_messages, 86 disable_resumable_full_refresh=True, 87 connector_state_manager=self._connector_state_manager, 88 ) 89 90 super().__init__( 91 source_config=source_config, 92 config=config, 93 debug=debug, 94 emit_connector_builder_messages=emit_connector_builder_messages, 95 component_factory=component_factory, 96 ) 97 98 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 99 if concurrency_level_from_manifest: 100 concurrency_level_component = self._constructor.create_component( 101 model_type=ConcurrencyLevelModel, 102 component_definition=concurrency_level_from_manifest, 103 config=config or {}, 104 ) 105 if not isinstance(concurrency_level_component, ConcurrencyLevel): 106 raise ValueError( 107 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 108 ) 109 110 concurrency_level = concurrency_level_component.get_concurrency_level() 111 initial_number_of_partitions_to_generate = max( 112 concurrency_level // 2, 1 113 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 114 else: 115 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 116 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 117 118 self._concurrent_source = ConcurrentSource.create( 119 num_workers=concurrency_level, 120 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 121 logger=self.logger, 122 slice_logger=self._slice_logger, 123 message_repository=self.message_repository, 124 ) 125 126 # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state. 127 @property 128 def is_partially_declarative(self) -> bool: 129 """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.""" 130 return False 131 132 def read( 133 self, 134 logger: logging.Logger, 135 config: Mapping[str, Any], 136 catalog: ConfiguredAirbyteCatalog, 137 state: Optional[List[AirbyteStateMessage]] = None, 138 ) -> Iterator[AirbyteMessage]: 139 concurrent_streams, _ = self._group_streams(config=config) 140 141 # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of 142 # the concurrent streams must be saved so that they can be removed from the catalog before starting 143 # synchronous streams 144 if len(concurrent_streams) > 0: 145 concurrent_stream_names = set( 146 [concurrent_stream.name for concurrent_stream in concurrent_streams] 147 ) 148 149 selected_concurrent_streams = self._select_streams( 150 streams=concurrent_streams, configured_catalog=catalog 151 ) 152 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 153 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 154 if selected_concurrent_streams: 155 yield from self._concurrent_source.read(selected_concurrent_streams) 156 157 # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the 158 # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many 159 # of which were already synced using the Concurrent CDK 160 filtered_catalog = self._remove_concurrent_streams_from_catalog( 161 catalog=catalog, concurrent_stream_names=concurrent_stream_names 162 ) 163 else: 164 filtered_catalog = catalog 165 166 # It is no need run read for synchronous streams if they are not exists. 167 if not filtered_catalog.streams: 168 return 169 170 yield from super().read(logger, config, filtered_catalog, state) 171 172 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 173 concurrent_streams, synchronous_streams = self._group_streams(config=config) 174 return AirbyteCatalog( 175 streams=[ 176 stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams 177 ] 178 ) 179 180 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 181 """ 182 The `streams` method is used as part of the AbstractSource in the following cases: 183 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 184 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 185 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 186 187 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 188 """ 189 return super().streams(config) 190 191 def _group_streams( 192 self, config: Mapping[str, Any] 193 ) -> Tuple[List[AbstractStream], List[Stream]]: 194 concurrent_streams: List[AbstractStream] = [] 195 synchronous_streams: List[Stream] = [] 196 197 # Combine streams and dynamic_streams. Note: both cannot be empty at the same time, 198 # and this is validated during the initialization of the source. 199 streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs( 200 self._source_config, config 201 ) 202 203 name_to_stream_mapping = {stream["name"]: stream for stream in streams} 204 205 for declarative_stream in self.streams(config=config): 206 # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect 207 # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, 208 # so we need to treat them as synchronous 209 210 if ( 211 isinstance(declarative_stream, DeclarativeStream) 212 and name_to_stream_mapping[declarative_stream.name]["type"] 213 == "StateDelegatingStream" 214 ): 215 stream_state = self._connector_state_manager.get_stream_state( 216 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 217 ) 218 219 name_to_stream_mapping[declarative_stream.name] = ( 220 name_to_stream_mapping[declarative_stream.name]["incremental_stream"] 221 if stream_state 222 else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"] 223 ) 224 225 if isinstance(declarative_stream, DeclarativeStream) and ( 226 name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 227 == "SimpleRetriever" 228 or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 229 == "AsyncRetriever" 230 ): 231 incremental_sync_component_definition = name_to_stream_mapping[ 232 declarative_stream.name 233 ].get("incremental_sync") 234 235 partition_router_component_definition = ( 236 name_to_stream_mapping[declarative_stream.name] 237 .get("retriever", {}) 238 .get("partition_router") 239 ) 240 is_without_partition_router_or_cursor = not bool( 241 incremental_sync_component_definition 242 ) and not bool(partition_router_component_definition) 243 244 is_substream_without_incremental = ( 245 partition_router_component_definition 246 and not incremental_sync_component_definition 247 ) 248 249 if self._is_concurrent_cursor_incremental_without_partition_routing( 250 declarative_stream, incremental_sync_component_definition 251 ): 252 stream_state = self._connector_state_manager.get_stream_state( 253 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 254 ) 255 stream_state = self._migrate_state(declarative_stream, stream_state) 256 257 retriever = self._get_retriever(declarative_stream, stream_state) 258 259 if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance( 260 declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter 261 ): 262 cursor = declarative_stream.retriever.stream_slicer.stream_slicer 263 264 if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor): 265 # This should never happen since we instantiate ConcurrentCursor in 266 # model_to_component_factory.py 267 raise ValueError( 268 f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}" 269 ) 270 271 partition_generator = StreamSlicerPartitionGenerator( 272 partition_factory=DeclarativePartitionFactory( 273 declarative_stream.name, 274 declarative_stream.get_json_schema(), 275 retriever, 276 self.message_repository, 277 ), 278 stream_slicer=declarative_stream.retriever.stream_slicer, 279 ) 280 else: 281 if ( 282 incremental_sync_component_definition 283 and incremental_sync_component_definition.get("type") 284 == IncrementingCountCursorModel.__name__ 285 ): 286 cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor( 287 model_type=IncrementingCountCursorModel, 288 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 289 stream_name=declarative_stream.name, 290 stream_namespace=declarative_stream.namespace, 291 config=config or {}, 292 ) 293 else: 294 cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( 295 model_type=DatetimeBasedCursorModel, 296 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 297 stream_name=declarative_stream.name, 298 stream_namespace=declarative_stream.namespace, 299 config=config or {}, 300 ) 301 partition_generator = StreamSlicerPartitionGenerator( 302 partition_factory=DeclarativePartitionFactory( 303 declarative_stream.name, 304 declarative_stream.get_json_schema(), 305 retriever, 306 self.message_repository, 307 ), 308 stream_slicer=cursor, 309 ) 310 311 concurrent_streams.append( 312 DefaultStream( 313 partition_generator=partition_generator, 314 name=declarative_stream.name, 315 json_schema=declarative_stream.get_json_schema(), 316 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 317 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 318 cursor_field=cursor.cursor_field.cursor_field_key 319 if hasattr(cursor, "cursor_field") 320 and hasattr( 321 cursor.cursor_field, "cursor_field_key" 322 ) # FIXME this will need to be updated once we do the per partition 323 else None, 324 logger=self.logger, 325 cursor=cursor, 326 ) 327 ) 328 elif ( 329 is_substream_without_incremental or is_without_partition_router_or_cursor 330 ) and hasattr(declarative_stream.retriever, "stream_slicer"): 331 partition_generator = StreamSlicerPartitionGenerator( 332 DeclarativePartitionFactory( 333 declarative_stream.name, 334 declarative_stream.get_json_schema(), 335 declarative_stream.retriever, 336 self.message_repository, 337 ), 338 declarative_stream.retriever.stream_slicer, 339 ) 340 341 final_state_cursor = FinalStateCursor( 342 stream_name=declarative_stream.name, 343 stream_namespace=declarative_stream.namespace, 344 message_repository=self.message_repository, 345 ) 346 347 concurrent_streams.append( 348 DefaultStream( 349 partition_generator=partition_generator, 350 name=declarative_stream.name, 351 json_schema=declarative_stream.get_json_schema(), 352 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 353 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 354 cursor_field=None, 355 logger=self.logger, 356 cursor=final_state_cursor, 357 ) 358 ) 359 elif ( 360 incremental_sync_component_definition 361 and incremental_sync_component_definition.get("type", "") 362 == DatetimeBasedCursorModel.__name__ 363 and hasattr(declarative_stream.retriever, "stream_slicer") 364 and isinstance( 365 declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor 366 ) 367 ): 368 stream_state = self._connector_state_manager.get_stream_state( 369 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 370 ) 371 stream_state = self._migrate_state(declarative_stream, stream_state) 372 373 partition_router = declarative_stream.retriever.stream_slicer._partition_router 374 375 perpartition_cursor = ( 376 self._constructor.create_concurrent_cursor_from_perpartition_cursor( 377 state_manager=self._connector_state_manager, 378 model_type=DatetimeBasedCursorModel, 379 component_definition=incremental_sync_component_definition, 380 stream_name=declarative_stream.name, 381 stream_namespace=declarative_stream.namespace, 382 config=config or {}, 383 stream_state=stream_state, 384 partition_router=partition_router, 385 ) 386 ) 387 388 retriever = self._get_retriever(declarative_stream, stream_state) 389 390 partition_generator = StreamSlicerPartitionGenerator( 391 DeclarativePartitionFactory( 392 declarative_stream.name, 393 declarative_stream.get_json_schema(), 394 retriever, 395 self.message_repository, 396 ), 397 perpartition_cursor, 398 ) 399 400 concurrent_streams.append( 401 DefaultStream( 402 partition_generator=partition_generator, 403 name=declarative_stream.name, 404 json_schema=declarative_stream.get_json_schema(), 405 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 406 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 407 cursor_field=perpartition_cursor.cursor_field.cursor_field_key, 408 logger=self.logger, 409 cursor=perpartition_cursor, 410 ) 411 ) 412 else: 413 synchronous_streams.append(declarative_stream) 414 # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state. 415 # Condition below needs to ensure that concurrent support is not lost for sources that already support 416 # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe). 417 elif ( 418 isinstance(declarative_stream, AbstractStreamFacade) 419 and self.is_partially_declarative 420 ): 421 concurrent_streams.append(declarative_stream.get_underlying_stream()) 422 else: 423 synchronous_streams.append(declarative_stream) 424 425 return concurrent_streams, synchronous_streams 426 427 def _is_concurrent_cursor_incremental_without_partition_routing( 428 self, 429 declarative_stream: DeclarativeStream, 430 incremental_sync_component_definition: Mapping[str, Any] | None, 431 ) -> bool: 432 return ( 433 incremental_sync_component_definition is not None 434 and bool(incremental_sync_component_definition) 435 and ( 436 incremental_sync_component_definition.get("type", "") 437 in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__) 438 ) 439 and hasattr(declarative_stream.retriever, "stream_slicer") 440 and ( 441 isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) 442 # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor 443 # add isintance check here if we want to create a Declarative IncrementingCountCursor 444 # or isinstance( 445 # declarative_stream.retriever.stream_slicer, IncrementingCountCursor 446 # ) 447 or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter) 448 ) 449 ) 450 451 @staticmethod 452 def _get_retriever( 453 declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any] 454 ) -> Retriever: 455 retriever = declarative_stream.retriever 456 457 # This is an optimization so that we don't invoke any cursor or state management flows within the 458 # low-code framework because state management is handled through the ConcurrentCursor. 459 if declarative_stream and isinstance(retriever, SimpleRetriever): 460 # Also a temporary hack. In the legacy Stream implementation, as part of the read, 461 # set_initial_state() is called to instantiate incoming state on the cursor. Although we no 462 # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components 463 # like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is 464 # properly initialized with state. 465 if retriever.cursor: 466 retriever.cursor.set_initial_state(stream_state=stream_state) 467 468 # Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance 469 # from the one initialized on the SimpleRetriever, so it also must also have state initialized 470 # for semi-incremental streams using is_client_side_incremental to filter properly 471 if isinstance(retriever.record_selector, RecordSelector) and isinstance( 472 retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator 473 ): 474 retriever.record_selector.record_filter._cursor.set_initial_state( 475 stream_state=stream_state 476 ) # type: ignore # After non-concurrent cursors are deprecated we can remove these cursor workarounds 477 478 # We zero it out here, but since this is a cursor reference, the state is still properly 479 # instantiated for the other components that reference it 480 retriever.cursor = None 481 482 return retriever 483 484 @staticmethod 485 def _select_streams( 486 streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog 487 ) -> List[AbstractStream]: 488 stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} 489 abstract_streams: List[AbstractStream] = [] 490 for configured_stream in configured_catalog.streams: 491 stream_instance = stream_name_to_instance.get(configured_stream.stream.name) 492 if stream_instance: 493 abstract_streams.append(stream_instance) 494 495 return abstract_streams 496 497 @staticmethod 498 def _remove_concurrent_streams_from_catalog( 499 catalog: ConfiguredAirbyteCatalog, 500 concurrent_stream_names: set[str], 501 ) -> ConfiguredAirbyteCatalog: 502 return ConfiguredAirbyteCatalog( 503 streams=[ 504 stream 505 for stream in catalog.streams 506 if stream.stream.name not in concurrent_stream_names 507 ] 508 ) 509 510 @staticmethod 511 def _migrate_state( 512 declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any] 513 ) -> MutableMapping[str, Any]: 514 for state_migration in declarative_stream.state_migrations: 515 if state_migration.should_migrate(stream_state): 516 # The state variable is expected to be mutable but the migrate method returns an immutable mapping. 517 stream_state = dict(state_migration.migrate(stream_state)) 518 519 return stream_state
Declarative source defined by a manifest of low-code components that define source connector behavior
is_partially_declarative: bool
127 @property 128 def is_partially_declarative(self) -> bool: 129 """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.""" 130 return False
This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.
def
discover( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteCatalog:
172 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 173 concurrent_streams, synchronous_streams = self._group_streams(config=config) 174 return AirbyteCatalog( 175 streams=[ 176 stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams 177 ] 178 )
Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.