airbyte_cdk.sources.declarative.concurrent_declarative_source
1# 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 3# 4 5import logging 6from typing import Any, Generic, Iterator, List, Mapping, MutableMapping, Optional, Tuple 7 8from airbyte_cdk.models import ( 9 AirbyteCatalog, 10 AirbyteMessage, 11 AirbyteStateMessage, 12 ConfiguredAirbyteCatalog, 13) 14from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource 15from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 16from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel 17from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream 18from airbyte_cdk.sources.declarative.extractors import RecordSelector 19from airbyte_cdk.sources.declarative.extractors.record_filter import ( 20 ClientSideIncrementalRecordFilterDecorator, 21) 22from airbyte_cdk.sources.declarative.incremental import ( 23 ConcurrentPerPartitionCursor, 24 GlobalSubstreamCursor, 25) 26from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor 27from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import ( 28 PerPartitionWithGlobalCursor, 29) 30from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource 31from airbyte_cdk.sources.declarative.models import FileUploader 32from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 33 ConcurrencyLevel as ConcurrencyLevelModel, 34) 35from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 36 DatetimeBasedCursor as DatetimeBasedCursorModel, 37) 38from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 39 IncrementingCountCursor as IncrementingCountCursorModel, 40) 41from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( 42 ModelToComponentFactory, 43) 44from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter 45from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever 46from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( 47 DeclarativePartitionFactory, 48 StreamSlicerPartitionGenerator, 49) 50from airbyte_cdk.sources.declarative.types import ConnectionDefinition 51from airbyte_cdk.sources.source import TState 52from airbyte_cdk.sources.streams import Stream 53from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 54from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade 55from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( 56 AlwaysAvailableAvailabilityStrategy, 57) 58from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor 59from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream 60from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream 61 62 63class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]): 64 # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock 65 # because it has hit the limit of futures but not partition reader is consuming them. 66 _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 67 68 def __init__( 69 self, 70 catalog: Optional[ConfiguredAirbyteCatalog], 71 config: Optional[Mapping[str, Any]], 72 state: TState, 73 source_config: ConnectionDefinition, 74 debug: bool = False, 75 emit_connector_builder_messages: bool = False, 76 component_factory: Optional[ModelToComponentFactory] = None, 77 **kwargs: Any, 78 ) -> None: 79 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 80 # no longer needs to store the original incoming state. But maybe there's an edge case? 81 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 82 83 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 84 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 85 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 86 # incremental streams running in full refresh. 87 component_factory = component_factory or ModelToComponentFactory( 88 emit_connector_builder_messages=emit_connector_builder_messages, 89 disable_resumable_full_refresh=True, 90 connector_state_manager=self._connector_state_manager, 91 ) 92 93 super().__init__( 94 source_config=source_config, 95 config=config, 96 debug=debug, 97 emit_connector_builder_messages=emit_connector_builder_messages, 98 component_factory=component_factory, 99 ) 100 101 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 102 if concurrency_level_from_manifest: 103 concurrency_level_component = self._constructor.create_component( 104 model_type=ConcurrencyLevelModel, 105 component_definition=concurrency_level_from_manifest, 106 config=config or {}, 107 ) 108 if not isinstance(concurrency_level_component, ConcurrencyLevel): 109 raise ValueError( 110 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 111 ) 112 113 concurrency_level = concurrency_level_component.get_concurrency_level() 114 initial_number_of_partitions_to_generate = max( 115 concurrency_level // 2, 1 116 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 117 else: 118 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 119 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 120 121 self._concurrent_source = ConcurrentSource.create( 122 num_workers=concurrency_level, 123 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 124 logger=self.logger, 125 slice_logger=self._slice_logger, 126 message_repository=self.message_repository, 127 ) 128 129 # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state. 130 @property 131 def is_partially_declarative(self) -> bool: 132 """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.""" 133 return False 134 135 def read( 136 self, 137 logger: logging.Logger, 138 config: Mapping[str, Any], 139 catalog: ConfiguredAirbyteCatalog, 140 state: Optional[List[AirbyteStateMessage]] = None, 141 ) -> Iterator[AirbyteMessage]: 142 concurrent_streams, _ = self._group_streams(config=config) 143 144 # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of 145 # the concurrent streams must be saved so that they can be removed from the catalog before starting 146 # synchronous streams 147 if len(concurrent_streams) > 0: 148 concurrent_stream_names = set( 149 [concurrent_stream.name for concurrent_stream in concurrent_streams] 150 ) 151 152 selected_concurrent_streams = self._select_streams( 153 streams=concurrent_streams, configured_catalog=catalog 154 ) 155 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 156 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 157 if selected_concurrent_streams: 158 yield from self._concurrent_source.read(selected_concurrent_streams) 159 160 # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the 161 # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many 162 # of which were already synced using the Concurrent CDK 163 filtered_catalog = self._remove_concurrent_streams_from_catalog( 164 catalog=catalog, concurrent_stream_names=concurrent_stream_names 165 ) 166 else: 167 filtered_catalog = catalog 168 169 # It is no need run read for synchronous streams if they are not exists. 170 if not filtered_catalog.streams: 171 return 172 173 yield from super().read(logger, config, filtered_catalog, state) 174 175 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 176 concurrent_streams, synchronous_streams = self._group_streams(config=config) 177 return AirbyteCatalog( 178 streams=[ 179 stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams 180 ] 181 ) 182 183 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 184 """ 185 The `streams` method is used as part of the AbstractSource in the following cases: 186 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 187 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 188 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 189 190 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 191 """ 192 return super().streams(config) 193 194 def _group_streams( 195 self, config: Mapping[str, Any] 196 ) -> Tuple[List[AbstractStream], List[Stream]]: 197 concurrent_streams: List[AbstractStream] = [] 198 synchronous_streams: List[Stream] = [] 199 200 # Combine streams and dynamic_streams. Note: both cannot be empty at the same time, 201 # and this is validated during the initialization of the source. 202 streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs( 203 self._source_config, config 204 ) 205 206 name_to_stream_mapping = {stream["name"]: stream for stream in streams} 207 208 for declarative_stream in self.streams(config=config): 209 # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect 210 # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, 211 # so we need to treat them as synchronous 212 213 supports_file_transfer = ( 214 isinstance(declarative_stream, DeclarativeStream) 215 and "file_uploader" in name_to_stream_mapping[declarative_stream.name] 216 ) 217 218 if ( 219 isinstance(declarative_stream, DeclarativeStream) 220 and name_to_stream_mapping[declarative_stream.name]["type"] 221 == "StateDelegatingStream" 222 ): 223 stream_state = self._connector_state_manager.get_stream_state( 224 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 225 ) 226 227 name_to_stream_mapping[declarative_stream.name] = ( 228 name_to_stream_mapping[declarative_stream.name]["incremental_stream"] 229 if stream_state 230 else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"] 231 ) 232 233 if isinstance(declarative_stream, DeclarativeStream) and ( 234 name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 235 == "SimpleRetriever" 236 or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 237 == "AsyncRetriever" 238 ): 239 incremental_sync_component_definition = name_to_stream_mapping[ 240 declarative_stream.name 241 ].get("incremental_sync") 242 243 partition_router_component_definition = ( 244 name_to_stream_mapping[declarative_stream.name] 245 .get("retriever", {}) 246 .get("partition_router") 247 ) 248 is_without_partition_router_or_cursor = not bool( 249 incremental_sync_component_definition 250 ) and not bool(partition_router_component_definition) 251 252 is_substream_without_incremental = ( 253 partition_router_component_definition 254 and not incremental_sync_component_definition 255 ) 256 257 if self._is_concurrent_cursor_incremental_without_partition_routing( 258 declarative_stream, incremental_sync_component_definition 259 ): 260 stream_state = self._connector_state_manager.get_stream_state( 261 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 262 ) 263 stream_state = self._migrate_state(declarative_stream, stream_state) 264 265 retriever = self._get_retriever(declarative_stream, stream_state) 266 267 if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance( 268 declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter 269 ): 270 cursor = declarative_stream.retriever.stream_slicer.stream_slicer 271 272 if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor): 273 # This should never happen since we instantiate ConcurrentCursor in 274 # model_to_component_factory.py 275 raise ValueError( 276 f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}" 277 ) 278 279 partition_generator = StreamSlicerPartitionGenerator( 280 partition_factory=DeclarativePartitionFactory( 281 declarative_stream.name, 282 declarative_stream.get_json_schema(), 283 retriever, 284 self.message_repository, 285 ), 286 stream_slicer=declarative_stream.retriever.stream_slicer, 287 ) 288 else: 289 if ( 290 incremental_sync_component_definition 291 and incremental_sync_component_definition.get("type") 292 == IncrementingCountCursorModel.__name__ 293 ): 294 cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor( 295 model_type=IncrementingCountCursorModel, 296 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 297 stream_name=declarative_stream.name, 298 stream_namespace=declarative_stream.namespace, 299 config=config or {}, 300 ) 301 else: 302 cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( 303 model_type=DatetimeBasedCursorModel, 304 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 305 stream_name=declarative_stream.name, 306 stream_namespace=declarative_stream.namespace, 307 config=config or {}, 308 stream_state_migrations=declarative_stream.state_migrations, 309 ) 310 partition_generator = StreamSlicerPartitionGenerator( 311 partition_factory=DeclarativePartitionFactory( 312 declarative_stream.name, 313 declarative_stream.get_json_schema(), 314 retriever, 315 self.message_repository, 316 ), 317 stream_slicer=cursor, 318 ) 319 320 concurrent_streams.append( 321 DefaultStream( 322 partition_generator=partition_generator, 323 name=declarative_stream.name, 324 json_schema=declarative_stream.get_json_schema(), 325 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 326 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 327 cursor_field=cursor.cursor_field.cursor_field_key 328 if hasattr(cursor, "cursor_field") 329 and hasattr( 330 cursor.cursor_field, "cursor_field_key" 331 ) # FIXME this will need to be updated once we do the per partition 332 else None, 333 logger=self.logger, 334 cursor=cursor, 335 supports_file_transfer=supports_file_transfer, 336 ) 337 ) 338 elif ( 339 is_substream_without_incremental or is_without_partition_router_or_cursor 340 ) and hasattr(declarative_stream.retriever, "stream_slicer"): 341 partition_generator = StreamSlicerPartitionGenerator( 342 DeclarativePartitionFactory( 343 declarative_stream.name, 344 declarative_stream.get_json_schema(), 345 declarative_stream.retriever, 346 self.message_repository, 347 ), 348 declarative_stream.retriever.stream_slicer, 349 ) 350 351 final_state_cursor = FinalStateCursor( 352 stream_name=declarative_stream.name, 353 stream_namespace=declarative_stream.namespace, 354 message_repository=self.message_repository, 355 ) 356 357 concurrent_streams.append( 358 DefaultStream( 359 partition_generator=partition_generator, 360 name=declarative_stream.name, 361 json_schema=declarative_stream.get_json_schema(), 362 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 363 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 364 cursor_field=None, 365 logger=self.logger, 366 cursor=final_state_cursor, 367 supports_file_transfer=supports_file_transfer, 368 ) 369 ) 370 elif ( 371 incremental_sync_component_definition 372 and incremental_sync_component_definition.get("type", "") 373 == DatetimeBasedCursorModel.__name__ 374 and hasattr(declarative_stream.retriever, "stream_slicer") 375 and isinstance( 376 declarative_stream.retriever.stream_slicer, 377 (GlobalSubstreamCursor, PerPartitionWithGlobalCursor), 378 ) 379 ): 380 stream_state = self._connector_state_manager.get_stream_state( 381 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 382 ) 383 stream_state = self._migrate_state(declarative_stream, stream_state) 384 385 partition_router = declarative_stream.retriever.stream_slicer._partition_router 386 387 perpartition_cursor = ( 388 self._constructor.create_concurrent_cursor_from_perpartition_cursor( 389 state_manager=self._connector_state_manager, 390 model_type=DatetimeBasedCursorModel, 391 component_definition=incremental_sync_component_definition, 392 stream_name=declarative_stream.name, 393 stream_namespace=declarative_stream.namespace, 394 config=config or {}, 395 stream_state=stream_state, 396 partition_router=partition_router, 397 ) 398 ) 399 400 retriever = self._get_retriever(declarative_stream, stream_state) 401 402 partition_generator = StreamSlicerPartitionGenerator( 403 DeclarativePartitionFactory( 404 declarative_stream.name, 405 declarative_stream.get_json_schema(), 406 retriever, 407 self.message_repository, 408 ), 409 perpartition_cursor, 410 ) 411 412 concurrent_streams.append( 413 DefaultStream( 414 partition_generator=partition_generator, 415 name=declarative_stream.name, 416 json_schema=declarative_stream.get_json_schema(), 417 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 418 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 419 cursor_field=perpartition_cursor.cursor_field.cursor_field_key, 420 logger=self.logger, 421 cursor=perpartition_cursor, 422 supports_file_transfer=supports_file_transfer, 423 ) 424 ) 425 else: 426 synchronous_streams.append(declarative_stream) 427 # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state. 428 # Condition below needs to ensure that concurrent support is not lost for sources that already support 429 # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe). 430 elif ( 431 isinstance(declarative_stream, AbstractStreamFacade) 432 and self.is_partially_declarative 433 ): 434 concurrent_streams.append(declarative_stream.get_underlying_stream()) 435 else: 436 synchronous_streams.append(declarative_stream) 437 438 return concurrent_streams, synchronous_streams 439 440 def _is_concurrent_cursor_incremental_without_partition_routing( 441 self, 442 declarative_stream: DeclarativeStream, 443 incremental_sync_component_definition: Mapping[str, Any] | None, 444 ) -> bool: 445 return ( 446 incremental_sync_component_definition is not None 447 and bool(incremental_sync_component_definition) 448 and ( 449 incremental_sync_component_definition.get("type", "") 450 in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__) 451 ) 452 and hasattr(declarative_stream.retriever, "stream_slicer") 453 and ( 454 isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) 455 # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor 456 # add isintance check here if we want to create a Declarative IncrementingCountCursor 457 # or isinstance( 458 # declarative_stream.retriever.stream_slicer, IncrementingCountCursor 459 # ) 460 or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter) 461 ) 462 ) 463 464 @staticmethod 465 def _get_retriever( 466 declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any] 467 ) -> Retriever: 468 retriever = declarative_stream.retriever 469 470 # This is an optimization so that we don't invoke any cursor or state management flows within the 471 # low-code framework because state management is handled through the ConcurrentCursor. 472 if declarative_stream and isinstance(retriever, SimpleRetriever): 473 # Also a temporary hack. In the legacy Stream implementation, as part of the read, 474 # set_initial_state() is called to instantiate incoming state on the cursor. Although we no 475 # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components 476 # like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is 477 # properly initialized with state. 478 if retriever.cursor: 479 retriever.cursor.set_initial_state(stream_state=stream_state) 480 481 # Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance 482 # from the one initialized on the SimpleRetriever, so it also must also have state initialized 483 # for semi-incremental streams using is_client_side_incremental to filter properly 484 if isinstance(retriever.record_selector, RecordSelector) and isinstance( 485 retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator 486 ): 487 retriever.record_selector.record_filter._cursor.set_initial_state( 488 stream_state=stream_state 489 ) # type: ignore # After non-concurrent cursors are deprecated we can remove these cursor workarounds 490 491 # We zero it out here, but since this is a cursor reference, the state is still properly 492 # instantiated for the other components that reference it 493 retriever.cursor = None 494 495 return retriever 496 497 @staticmethod 498 def _select_streams( 499 streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog 500 ) -> List[AbstractStream]: 501 stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} 502 abstract_streams: List[AbstractStream] = [] 503 for configured_stream in configured_catalog.streams: 504 stream_instance = stream_name_to_instance.get(configured_stream.stream.name) 505 if stream_instance: 506 abstract_streams.append(stream_instance) 507 508 return abstract_streams 509 510 @staticmethod 511 def _remove_concurrent_streams_from_catalog( 512 catalog: ConfiguredAirbyteCatalog, 513 concurrent_stream_names: set[str], 514 ) -> ConfiguredAirbyteCatalog: 515 return ConfiguredAirbyteCatalog( 516 streams=[ 517 stream 518 for stream in catalog.streams 519 if stream.stream.name not in concurrent_stream_names 520 ] 521 ) 522 523 @staticmethod 524 def _migrate_state( 525 declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any] 526 ) -> MutableMapping[str, Any]: 527 for state_migration in declarative_stream.state_migrations: 528 if state_migration.should_migrate(stream_state): 529 # The state variable is expected to be mutable but the migrate method returns an immutable mapping. 530 stream_state = dict(state_migration.migrate(stream_state)) 531 532 return stream_state
class
ConcurrentDeclarativeSource(airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource, typing.Generic[~TState]):
64class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]): 65 # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock 66 # because it has hit the limit of futures but not partition reader is consuming them. 67 _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 68 69 def __init__( 70 self, 71 catalog: Optional[ConfiguredAirbyteCatalog], 72 config: Optional[Mapping[str, Any]], 73 state: TState, 74 source_config: ConnectionDefinition, 75 debug: bool = False, 76 emit_connector_builder_messages: bool = False, 77 component_factory: Optional[ModelToComponentFactory] = None, 78 **kwargs: Any, 79 ) -> None: 80 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 81 # no longer needs to store the original incoming state. But maybe there's an edge case? 82 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 83 84 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 85 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 86 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 87 # incremental streams running in full refresh. 88 component_factory = component_factory or ModelToComponentFactory( 89 emit_connector_builder_messages=emit_connector_builder_messages, 90 disable_resumable_full_refresh=True, 91 connector_state_manager=self._connector_state_manager, 92 ) 93 94 super().__init__( 95 source_config=source_config, 96 config=config, 97 debug=debug, 98 emit_connector_builder_messages=emit_connector_builder_messages, 99 component_factory=component_factory, 100 ) 101 102 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 103 if concurrency_level_from_manifest: 104 concurrency_level_component = self._constructor.create_component( 105 model_type=ConcurrencyLevelModel, 106 component_definition=concurrency_level_from_manifest, 107 config=config or {}, 108 ) 109 if not isinstance(concurrency_level_component, ConcurrencyLevel): 110 raise ValueError( 111 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 112 ) 113 114 concurrency_level = concurrency_level_component.get_concurrency_level() 115 initial_number_of_partitions_to_generate = max( 116 concurrency_level // 2, 1 117 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 118 else: 119 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 120 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 121 122 self._concurrent_source = ConcurrentSource.create( 123 num_workers=concurrency_level, 124 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 125 logger=self.logger, 126 slice_logger=self._slice_logger, 127 message_repository=self.message_repository, 128 ) 129 130 # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state. 131 @property 132 def is_partially_declarative(self) -> bool: 133 """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.""" 134 return False 135 136 def read( 137 self, 138 logger: logging.Logger, 139 config: Mapping[str, Any], 140 catalog: ConfiguredAirbyteCatalog, 141 state: Optional[List[AirbyteStateMessage]] = None, 142 ) -> Iterator[AirbyteMessage]: 143 concurrent_streams, _ = self._group_streams(config=config) 144 145 # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of 146 # the concurrent streams must be saved so that they can be removed from the catalog before starting 147 # synchronous streams 148 if len(concurrent_streams) > 0: 149 concurrent_stream_names = set( 150 [concurrent_stream.name for concurrent_stream in concurrent_streams] 151 ) 152 153 selected_concurrent_streams = self._select_streams( 154 streams=concurrent_streams, configured_catalog=catalog 155 ) 156 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 157 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 158 if selected_concurrent_streams: 159 yield from self._concurrent_source.read(selected_concurrent_streams) 160 161 # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the 162 # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many 163 # of which were already synced using the Concurrent CDK 164 filtered_catalog = self._remove_concurrent_streams_from_catalog( 165 catalog=catalog, concurrent_stream_names=concurrent_stream_names 166 ) 167 else: 168 filtered_catalog = catalog 169 170 # It is no need run read for synchronous streams if they are not exists. 171 if not filtered_catalog.streams: 172 return 173 174 yield from super().read(logger, config, filtered_catalog, state) 175 176 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 177 concurrent_streams, synchronous_streams = self._group_streams(config=config) 178 return AirbyteCatalog( 179 streams=[ 180 stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams 181 ] 182 ) 183 184 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 185 """ 186 The `streams` method is used as part of the AbstractSource in the following cases: 187 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 188 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 189 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 190 191 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 192 """ 193 return super().streams(config) 194 195 def _group_streams( 196 self, config: Mapping[str, Any] 197 ) -> Tuple[List[AbstractStream], List[Stream]]: 198 concurrent_streams: List[AbstractStream] = [] 199 synchronous_streams: List[Stream] = [] 200 201 # Combine streams and dynamic_streams. Note: both cannot be empty at the same time, 202 # and this is validated during the initialization of the source. 203 streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs( 204 self._source_config, config 205 ) 206 207 name_to_stream_mapping = {stream["name"]: stream for stream in streams} 208 209 for declarative_stream in self.streams(config=config): 210 # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect 211 # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, 212 # so we need to treat them as synchronous 213 214 supports_file_transfer = ( 215 isinstance(declarative_stream, DeclarativeStream) 216 and "file_uploader" in name_to_stream_mapping[declarative_stream.name] 217 ) 218 219 if ( 220 isinstance(declarative_stream, DeclarativeStream) 221 and name_to_stream_mapping[declarative_stream.name]["type"] 222 == "StateDelegatingStream" 223 ): 224 stream_state = self._connector_state_manager.get_stream_state( 225 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 226 ) 227 228 name_to_stream_mapping[declarative_stream.name] = ( 229 name_to_stream_mapping[declarative_stream.name]["incremental_stream"] 230 if stream_state 231 else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"] 232 ) 233 234 if isinstance(declarative_stream, DeclarativeStream) and ( 235 name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 236 == "SimpleRetriever" 237 or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 238 == "AsyncRetriever" 239 ): 240 incremental_sync_component_definition = name_to_stream_mapping[ 241 declarative_stream.name 242 ].get("incremental_sync") 243 244 partition_router_component_definition = ( 245 name_to_stream_mapping[declarative_stream.name] 246 .get("retriever", {}) 247 .get("partition_router") 248 ) 249 is_without_partition_router_or_cursor = not bool( 250 incremental_sync_component_definition 251 ) and not bool(partition_router_component_definition) 252 253 is_substream_without_incremental = ( 254 partition_router_component_definition 255 and not incremental_sync_component_definition 256 ) 257 258 if self._is_concurrent_cursor_incremental_without_partition_routing( 259 declarative_stream, incremental_sync_component_definition 260 ): 261 stream_state = self._connector_state_manager.get_stream_state( 262 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 263 ) 264 stream_state = self._migrate_state(declarative_stream, stream_state) 265 266 retriever = self._get_retriever(declarative_stream, stream_state) 267 268 if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance( 269 declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter 270 ): 271 cursor = declarative_stream.retriever.stream_slicer.stream_slicer 272 273 if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor): 274 # This should never happen since we instantiate ConcurrentCursor in 275 # model_to_component_factory.py 276 raise ValueError( 277 f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}" 278 ) 279 280 partition_generator = StreamSlicerPartitionGenerator( 281 partition_factory=DeclarativePartitionFactory( 282 declarative_stream.name, 283 declarative_stream.get_json_schema(), 284 retriever, 285 self.message_repository, 286 ), 287 stream_slicer=declarative_stream.retriever.stream_slicer, 288 ) 289 else: 290 if ( 291 incremental_sync_component_definition 292 and incremental_sync_component_definition.get("type") 293 == IncrementingCountCursorModel.__name__ 294 ): 295 cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor( 296 model_type=IncrementingCountCursorModel, 297 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 298 stream_name=declarative_stream.name, 299 stream_namespace=declarative_stream.namespace, 300 config=config or {}, 301 ) 302 else: 303 cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( 304 model_type=DatetimeBasedCursorModel, 305 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 306 stream_name=declarative_stream.name, 307 stream_namespace=declarative_stream.namespace, 308 config=config or {}, 309 stream_state_migrations=declarative_stream.state_migrations, 310 ) 311 partition_generator = StreamSlicerPartitionGenerator( 312 partition_factory=DeclarativePartitionFactory( 313 declarative_stream.name, 314 declarative_stream.get_json_schema(), 315 retriever, 316 self.message_repository, 317 ), 318 stream_slicer=cursor, 319 ) 320 321 concurrent_streams.append( 322 DefaultStream( 323 partition_generator=partition_generator, 324 name=declarative_stream.name, 325 json_schema=declarative_stream.get_json_schema(), 326 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 327 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 328 cursor_field=cursor.cursor_field.cursor_field_key 329 if hasattr(cursor, "cursor_field") 330 and hasattr( 331 cursor.cursor_field, "cursor_field_key" 332 ) # FIXME this will need to be updated once we do the per partition 333 else None, 334 logger=self.logger, 335 cursor=cursor, 336 supports_file_transfer=supports_file_transfer, 337 ) 338 ) 339 elif ( 340 is_substream_without_incremental or is_without_partition_router_or_cursor 341 ) and hasattr(declarative_stream.retriever, "stream_slicer"): 342 partition_generator = StreamSlicerPartitionGenerator( 343 DeclarativePartitionFactory( 344 declarative_stream.name, 345 declarative_stream.get_json_schema(), 346 declarative_stream.retriever, 347 self.message_repository, 348 ), 349 declarative_stream.retriever.stream_slicer, 350 ) 351 352 final_state_cursor = FinalStateCursor( 353 stream_name=declarative_stream.name, 354 stream_namespace=declarative_stream.namespace, 355 message_repository=self.message_repository, 356 ) 357 358 concurrent_streams.append( 359 DefaultStream( 360 partition_generator=partition_generator, 361 name=declarative_stream.name, 362 json_schema=declarative_stream.get_json_schema(), 363 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 364 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 365 cursor_field=None, 366 logger=self.logger, 367 cursor=final_state_cursor, 368 supports_file_transfer=supports_file_transfer, 369 ) 370 ) 371 elif ( 372 incremental_sync_component_definition 373 and incremental_sync_component_definition.get("type", "") 374 == DatetimeBasedCursorModel.__name__ 375 and hasattr(declarative_stream.retriever, "stream_slicer") 376 and isinstance( 377 declarative_stream.retriever.stream_slicer, 378 (GlobalSubstreamCursor, PerPartitionWithGlobalCursor), 379 ) 380 ): 381 stream_state = self._connector_state_manager.get_stream_state( 382 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 383 ) 384 stream_state = self._migrate_state(declarative_stream, stream_state) 385 386 partition_router = declarative_stream.retriever.stream_slicer._partition_router 387 388 perpartition_cursor = ( 389 self._constructor.create_concurrent_cursor_from_perpartition_cursor( 390 state_manager=self._connector_state_manager, 391 model_type=DatetimeBasedCursorModel, 392 component_definition=incremental_sync_component_definition, 393 stream_name=declarative_stream.name, 394 stream_namespace=declarative_stream.namespace, 395 config=config or {}, 396 stream_state=stream_state, 397 partition_router=partition_router, 398 ) 399 ) 400 401 retriever = self._get_retriever(declarative_stream, stream_state) 402 403 partition_generator = StreamSlicerPartitionGenerator( 404 DeclarativePartitionFactory( 405 declarative_stream.name, 406 declarative_stream.get_json_schema(), 407 retriever, 408 self.message_repository, 409 ), 410 perpartition_cursor, 411 ) 412 413 concurrent_streams.append( 414 DefaultStream( 415 partition_generator=partition_generator, 416 name=declarative_stream.name, 417 json_schema=declarative_stream.get_json_schema(), 418 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 419 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 420 cursor_field=perpartition_cursor.cursor_field.cursor_field_key, 421 logger=self.logger, 422 cursor=perpartition_cursor, 423 supports_file_transfer=supports_file_transfer, 424 ) 425 ) 426 else: 427 synchronous_streams.append(declarative_stream) 428 # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state. 429 # Condition below needs to ensure that concurrent support is not lost for sources that already support 430 # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe). 431 elif ( 432 isinstance(declarative_stream, AbstractStreamFacade) 433 and self.is_partially_declarative 434 ): 435 concurrent_streams.append(declarative_stream.get_underlying_stream()) 436 else: 437 synchronous_streams.append(declarative_stream) 438 439 return concurrent_streams, synchronous_streams 440 441 def _is_concurrent_cursor_incremental_without_partition_routing( 442 self, 443 declarative_stream: DeclarativeStream, 444 incremental_sync_component_definition: Mapping[str, Any] | None, 445 ) -> bool: 446 return ( 447 incremental_sync_component_definition is not None 448 and bool(incremental_sync_component_definition) 449 and ( 450 incremental_sync_component_definition.get("type", "") 451 in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__) 452 ) 453 and hasattr(declarative_stream.retriever, "stream_slicer") 454 and ( 455 isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) 456 # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor 457 # add isintance check here if we want to create a Declarative IncrementingCountCursor 458 # or isinstance( 459 # declarative_stream.retriever.stream_slicer, IncrementingCountCursor 460 # ) 461 or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter) 462 ) 463 ) 464 465 @staticmethod 466 def _get_retriever( 467 declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any] 468 ) -> Retriever: 469 retriever = declarative_stream.retriever 470 471 # This is an optimization so that we don't invoke any cursor or state management flows within the 472 # low-code framework because state management is handled through the ConcurrentCursor. 473 if declarative_stream and isinstance(retriever, SimpleRetriever): 474 # Also a temporary hack. In the legacy Stream implementation, as part of the read, 475 # set_initial_state() is called to instantiate incoming state on the cursor. Although we no 476 # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components 477 # like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is 478 # properly initialized with state. 479 if retriever.cursor: 480 retriever.cursor.set_initial_state(stream_state=stream_state) 481 482 # Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance 483 # from the one initialized on the SimpleRetriever, so it also must also have state initialized 484 # for semi-incremental streams using is_client_side_incremental to filter properly 485 if isinstance(retriever.record_selector, RecordSelector) and isinstance( 486 retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator 487 ): 488 retriever.record_selector.record_filter._cursor.set_initial_state( 489 stream_state=stream_state 490 ) # type: ignore # After non-concurrent cursors are deprecated we can remove these cursor workarounds 491 492 # We zero it out here, but since this is a cursor reference, the state is still properly 493 # instantiated for the other components that reference it 494 retriever.cursor = None 495 496 return retriever 497 498 @staticmethod 499 def _select_streams( 500 streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog 501 ) -> List[AbstractStream]: 502 stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} 503 abstract_streams: List[AbstractStream] = [] 504 for configured_stream in configured_catalog.streams: 505 stream_instance = stream_name_to_instance.get(configured_stream.stream.name) 506 if stream_instance: 507 abstract_streams.append(stream_instance) 508 509 return abstract_streams 510 511 @staticmethod 512 def _remove_concurrent_streams_from_catalog( 513 catalog: ConfiguredAirbyteCatalog, 514 concurrent_stream_names: set[str], 515 ) -> ConfiguredAirbyteCatalog: 516 return ConfiguredAirbyteCatalog( 517 streams=[ 518 stream 519 for stream in catalog.streams 520 if stream.stream.name not in concurrent_stream_names 521 ] 522 ) 523 524 @staticmethod 525 def _migrate_state( 526 declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any] 527 ) -> MutableMapping[str, Any]: 528 for state_migration in declarative_stream.state_migrations: 529 if state_migration.should_migrate(stream_state): 530 # The state variable is expected to be mutable but the migrate method returns an immutable mapping. 531 stream_state = dict(state_migration.migrate(stream_state)) 532 533 return stream_state
Declarative source defined by a manifest of low-code components that define source connector behavior
is_partially_declarative: bool
131 @property 132 def is_partially_declarative(self) -> bool: 133 """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.""" 134 return False
This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.
def
discover( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteCatalog:
176 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 177 concurrent_streams, synchronous_streams = self._group_streams(config=config) 178 return AirbyteCatalog( 179 streams=[ 180 stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams 181 ] 182 )
Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.