airbyte_cdk.sources.declarative.concurrent_declarative_source
1# 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 3# 4 5import logging 6from typing import Any, Generic, Iterator, List, Mapping, MutableMapping, Optional, Tuple 7 8from airbyte_cdk.models import ( 9 AirbyteCatalog, 10 AirbyteMessage, 11 AirbyteStateMessage, 12 ConfiguredAirbyteCatalog, 13) 14from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource 15from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 16from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel 17from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream 18from airbyte_cdk.sources.declarative.extractors import RecordSelector 19from airbyte_cdk.sources.declarative.extractors.record_filter import ( 20 ClientSideIncrementalRecordFilterDecorator, 21) 22from airbyte_cdk.sources.declarative.incremental import ( 23 ConcurrentPerPartitionCursor, 24 GlobalSubstreamCursor, 25) 26from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor 27from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import ( 28 PerPartitionWithGlobalCursor, 29) 30from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource 31from airbyte_cdk.sources.declarative.models import FileUploader 32from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 33 ConcurrencyLevel as ConcurrencyLevelModel, 34) 35from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 36 DatetimeBasedCursor as DatetimeBasedCursorModel, 37) 38from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 39 IncrementingCountCursor as IncrementingCountCursorModel, 40) 41from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( 42 ModelToComponentFactory, 43) 44from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter 45from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever 46from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( 47 DeclarativePartitionFactory, 48 StreamSlicerPartitionGenerator, 49) 50from airbyte_cdk.sources.declarative.types import ConnectionDefinition 51from airbyte_cdk.sources.source import TState 52from airbyte_cdk.sources.streams import Stream 53from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 54from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade 55from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( 56 AlwaysAvailableAvailabilityStrategy, 57) 58from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor 59from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream 60from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream 61 62 63class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]): 64 # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock 65 # because it has hit the limit of futures but not partition reader is consuming them. 66 _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 67 68 def __init__( 69 self, 70 catalog: Optional[ConfiguredAirbyteCatalog], 71 config: Optional[Mapping[str, Any]], 72 state: TState, 73 source_config: ConnectionDefinition, 74 debug: bool = False, 75 emit_connector_builder_messages: bool = False, 76 component_factory: Optional[ModelToComponentFactory] = None, 77 config_path: Optional[str] = None, 78 **kwargs: Any, 79 ) -> None: 80 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 81 # no longer needs to store the original incoming state. But maybe there's an edge case? 82 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 83 84 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 85 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 86 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 87 # incremental streams running in full refresh. 88 component_factory = component_factory or ModelToComponentFactory( 89 emit_connector_builder_messages=emit_connector_builder_messages, 90 disable_resumable_full_refresh=True, 91 connector_state_manager=self._connector_state_manager, 92 ) 93 94 super().__init__( 95 source_config=source_config, 96 config=config, 97 debug=debug, 98 emit_connector_builder_messages=emit_connector_builder_messages, 99 component_factory=component_factory, 100 config_path=config_path, 101 ) 102 103 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 104 if concurrency_level_from_manifest: 105 concurrency_level_component = self._constructor.create_component( 106 model_type=ConcurrencyLevelModel, 107 component_definition=concurrency_level_from_manifest, 108 config=config or {}, 109 ) 110 if not isinstance(concurrency_level_component, ConcurrencyLevel): 111 raise ValueError( 112 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 113 ) 114 115 concurrency_level = concurrency_level_component.get_concurrency_level() 116 initial_number_of_partitions_to_generate = max( 117 concurrency_level // 2, 1 118 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 119 else: 120 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 121 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 122 123 self._concurrent_source = ConcurrentSource.create( 124 num_workers=concurrency_level, 125 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 126 logger=self.logger, 127 slice_logger=self._slice_logger, 128 message_repository=self.message_repository, 129 ) 130 131 # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state. 132 @property 133 def is_partially_declarative(self) -> bool: 134 """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.""" 135 return False 136 137 def read( 138 self, 139 logger: logging.Logger, 140 config: Mapping[str, Any], 141 catalog: ConfiguredAirbyteCatalog, 142 state: Optional[List[AirbyteStateMessage]] = None, 143 ) -> Iterator[AirbyteMessage]: 144 concurrent_streams, _ = self._group_streams(config=config) 145 146 # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of 147 # the concurrent streams must be saved so that they can be removed from the catalog before starting 148 # synchronous streams 149 if len(concurrent_streams) > 0: 150 concurrent_stream_names = set( 151 [concurrent_stream.name for concurrent_stream in concurrent_streams] 152 ) 153 154 selected_concurrent_streams = self._select_streams( 155 streams=concurrent_streams, configured_catalog=catalog 156 ) 157 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 158 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 159 if selected_concurrent_streams: 160 yield from self._concurrent_source.read(selected_concurrent_streams) 161 162 # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the 163 # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many 164 # of which were already synced using the Concurrent CDK 165 filtered_catalog = self._remove_concurrent_streams_from_catalog( 166 catalog=catalog, concurrent_stream_names=concurrent_stream_names 167 ) 168 else: 169 filtered_catalog = catalog 170 171 # It is no need run read for synchronous streams if they are not exists. 172 if not filtered_catalog.streams: 173 return 174 175 yield from super().read(logger, config, filtered_catalog, state) 176 177 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 178 concurrent_streams, synchronous_streams = self._group_streams(config=config) 179 return AirbyteCatalog( 180 streams=[ 181 stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams 182 ] 183 ) 184 185 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 186 """ 187 The `streams` method is used as part of the AbstractSource in the following cases: 188 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 189 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 190 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 191 192 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 193 """ 194 return super().streams(config) 195 196 def _group_streams( 197 self, config: Mapping[str, Any] 198 ) -> Tuple[List[AbstractStream], List[Stream]]: 199 concurrent_streams: List[AbstractStream] = [] 200 synchronous_streams: List[Stream] = [] 201 202 # Combine streams and dynamic_streams. Note: both cannot be empty at the same time, 203 # and this is validated during the initialization of the source. 204 streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs( 205 self._source_config, config 206 ) 207 208 name_to_stream_mapping = {stream["name"]: stream for stream in streams} 209 210 for declarative_stream in self.streams(config=config): 211 # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect 212 # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, 213 # so we need to treat them as synchronous 214 215 supports_file_transfer = ( 216 isinstance(declarative_stream, DeclarativeStream) 217 and "file_uploader" in name_to_stream_mapping[declarative_stream.name] 218 ) 219 220 if ( 221 isinstance(declarative_stream, DeclarativeStream) 222 and name_to_stream_mapping[declarative_stream.name]["type"] 223 == "StateDelegatingStream" 224 ): 225 stream_state = self._connector_state_manager.get_stream_state( 226 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 227 ) 228 229 name_to_stream_mapping[declarative_stream.name] = ( 230 name_to_stream_mapping[declarative_stream.name]["incremental_stream"] 231 if stream_state 232 else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"] 233 ) 234 235 if isinstance(declarative_stream, DeclarativeStream) and ( 236 name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 237 == "SimpleRetriever" 238 or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 239 == "AsyncRetriever" 240 ): 241 incremental_sync_component_definition = name_to_stream_mapping[ 242 declarative_stream.name 243 ].get("incremental_sync") 244 245 partition_router_component_definition = ( 246 name_to_stream_mapping[declarative_stream.name] 247 .get("retriever", {}) 248 .get("partition_router") 249 ) 250 is_without_partition_router_or_cursor = not bool( 251 incremental_sync_component_definition 252 ) and not bool(partition_router_component_definition) 253 254 is_substream_without_incremental = ( 255 partition_router_component_definition 256 and not incremental_sync_component_definition 257 ) 258 259 if self._is_concurrent_cursor_incremental_without_partition_routing( 260 declarative_stream, incremental_sync_component_definition 261 ): 262 stream_state = self._connector_state_manager.get_stream_state( 263 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 264 ) 265 stream_state = self._migrate_state(declarative_stream, stream_state) 266 267 retriever = self._get_retriever(declarative_stream, stream_state) 268 269 if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance( 270 declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter 271 ): 272 cursor = declarative_stream.retriever.stream_slicer.stream_slicer 273 274 if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor): 275 # This should never happen since we instantiate ConcurrentCursor in 276 # model_to_component_factory.py 277 raise ValueError( 278 f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}" 279 ) 280 281 partition_generator = StreamSlicerPartitionGenerator( 282 partition_factory=DeclarativePartitionFactory( 283 declarative_stream.name, 284 declarative_stream.get_json_schema(), 285 retriever, 286 self.message_repository, 287 ), 288 stream_slicer=declarative_stream.retriever.stream_slicer, 289 ) 290 else: 291 if ( 292 incremental_sync_component_definition 293 and incremental_sync_component_definition.get("type") 294 == IncrementingCountCursorModel.__name__ 295 ): 296 cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor( 297 model_type=IncrementingCountCursorModel, 298 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 299 stream_name=declarative_stream.name, 300 stream_namespace=declarative_stream.namespace, 301 config=config or {}, 302 ) 303 else: 304 cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( 305 model_type=DatetimeBasedCursorModel, 306 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 307 stream_name=declarative_stream.name, 308 stream_namespace=declarative_stream.namespace, 309 config=config or {}, 310 stream_state_migrations=declarative_stream.state_migrations, 311 ) 312 partition_generator = StreamSlicerPartitionGenerator( 313 partition_factory=DeclarativePartitionFactory( 314 declarative_stream.name, 315 declarative_stream.get_json_schema(), 316 retriever, 317 self.message_repository, 318 ), 319 stream_slicer=cursor, 320 ) 321 322 concurrent_streams.append( 323 DefaultStream( 324 partition_generator=partition_generator, 325 name=declarative_stream.name, 326 json_schema=declarative_stream.get_json_schema(), 327 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 328 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 329 cursor_field=cursor.cursor_field.cursor_field_key 330 if hasattr(cursor, "cursor_field") 331 and hasattr( 332 cursor.cursor_field, "cursor_field_key" 333 ) # FIXME this will need to be updated once we do the per partition 334 else None, 335 logger=self.logger, 336 cursor=cursor, 337 supports_file_transfer=supports_file_transfer, 338 ) 339 ) 340 elif ( 341 is_substream_without_incremental or is_without_partition_router_or_cursor 342 ) and hasattr(declarative_stream.retriever, "stream_slicer"): 343 partition_generator = StreamSlicerPartitionGenerator( 344 DeclarativePartitionFactory( 345 declarative_stream.name, 346 declarative_stream.get_json_schema(), 347 declarative_stream.retriever, 348 self.message_repository, 349 ), 350 declarative_stream.retriever.stream_slicer, 351 ) 352 353 final_state_cursor = FinalStateCursor( 354 stream_name=declarative_stream.name, 355 stream_namespace=declarative_stream.namespace, 356 message_repository=self.message_repository, 357 ) 358 359 concurrent_streams.append( 360 DefaultStream( 361 partition_generator=partition_generator, 362 name=declarative_stream.name, 363 json_schema=declarative_stream.get_json_schema(), 364 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 365 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 366 cursor_field=None, 367 logger=self.logger, 368 cursor=final_state_cursor, 369 supports_file_transfer=supports_file_transfer, 370 ) 371 ) 372 elif ( 373 incremental_sync_component_definition 374 and incremental_sync_component_definition.get("type", "") 375 == DatetimeBasedCursorModel.__name__ 376 and hasattr(declarative_stream.retriever, "stream_slicer") 377 and isinstance( 378 declarative_stream.retriever.stream_slicer, 379 (GlobalSubstreamCursor, PerPartitionWithGlobalCursor), 380 ) 381 ): 382 stream_state = self._connector_state_manager.get_stream_state( 383 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 384 ) 385 stream_state = self._migrate_state(declarative_stream, stream_state) 386 387 partition_router = declarative_stream.retriever.stream_slicer._partition_router 388 389 perpartition_cursor = ( 390 self._constructor.create_concurrent_cursor_from_perpartition_cursor( 391 state_manager=self._connector_state_manager, 392 model_type=DatetimeBasedCursorModel, 393 component_definition=incremental_sync_component_definition, 394 stream_name=declarative_stream.name, 395 stream_namespace=declarative_stream.namespace, 396 config=config or {}, 397 stream_state=stream_state, 398 partition_router=partition_router, 399 ) 400 ) 401 402 retriever = self._get_retriever(declarative_stream, stream_state) 403 404 partition_generator = StreamSlicerPartitionGenerator( 405 DeclarativePartitionFactory( 406 declarative_stream.name, 407 declarative_stream.get_json_schema(), 408 retriever, 409 self.message_repository, 410 ), 411 perpartition_cursor, 412 ) 413 414 concurrent_streams.append( 415 DefaultStream( 416 partition_generator=partition_generator, 417 name=declarative_stream.name, 418 json_schema=declarative_stream.get_json_schema(), 419 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 420 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 421 cursor_field=perpartition_cursor.cursor_field.cursor_field_key, 422 logger=self.logger, 423 cursor=perpartition_cursor, 424 supports_file_transfer=supports_file_transfer, 425 ) 426 ) 427 else: 428 synchronous_streams.append(declarative_stream) 429 # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state. 430 # Condition below needs to ensure that concurrent support is not lost for sources that already support 431 # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe). 432 elif ( 433 isinstance(declarative_stream, AbstractStreamFacade) 434 and self.is_partially_declarative 435 ): 436 concurrent_streams.append(declarative_stream.get_underlying_stream()) 437 else: 438 synchronous_streams.append(declarative_stream) 439 440 return concurrent_streams, synchronous_streams 441 442 def _is_concurrent_cursor_incremental_without_partition_routing( 443 self, 444 declarative_stream: DeclarativeStream, 445 incremental_sync_component_definition: Mapping[str, Any] | None, 446 ) -> bool: 447 return ( 448 incremental_sync_component_definition is not None 449 and bool(incremental_sync_component_definition) 450 and ( 451 incremental_sync_component_definition.get("type", "") 452 in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__) 453 ) 454 and hasattr(declarative_stream.retriever, "stream_slicer") 455 and ( 456 isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) 457 # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor 458 # add isintance check here if we want to create a Declarative IncrementingCountCursor 459 # or isinstance( 460 # declarative_stream.retriever.stream_slicer, IncrementingCountCursor 461 # ) 462 or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter) 463 ) 464 ) 465 466 @staticmethod 467 def _get_retriever( 468 declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any] 469 ) -> Retriever: 470 retriever = declarative_stream.retriever 471 472 # This is an optimization so that we don't invoke any cursor or state management flows within the 473 # low-code framework because state management is handled through the ConcurrentCursor. 474 if declarative_stream and isinstance(retriever, SimpleRetriever): 475 # Also a temporary hack. In the legacy Stream implementation, as part of the read, 476 # set_initial_state() is called to instantiate incoming state on the cursor. Although we no 477 # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components 478 # like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is 479 # properly initialized with state. 480 if retriever.cursor: 481 retriever.cursor.set_initial_state(stream_state=stream_state) 482 483 # Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance 484 # from the one initialized on the SimpleRetriever, so it also must also have state initialized 485 # for semi-incremental streams using is_client_side_incremental to filter properly 486 if isinstance(retriever.record_selector, RecordSelector) and isinstance( 487 retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator 488 ): 489 retriever.record_selector.record_filter._cursor.set_initial_state( 490 stream_state=stream_state 491 ) # type: ignore # After non-concurrent cursors are deprecated we can remove these cursor workarounds 492 493 # We zero it out here, but since this is a cursor reference, the state is still properly 494 # instantiated for the other components that reference it 495 retriever.cursor = None 496 497 return retriever 498 499 @staticmethod 500 def _select_streams( 501 streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog 502 ) -> List[AbstractStream]: 503 stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} 504 abstract_streams: List[AbstractStream] = [] 505 for configured_stream in configured_catalog.streams: 506 stream_instance = stream_name_to_instance.get(configured_stream.stream.name) 507 if stream_instance: 508 abstract_streams.append(stream_instance) 509 510 return abstract_streams 511 512 @staticmethod 513 def _remove_concurrent_streams_from_catalog( 514 catalog: ConfiguredAirbyteCatalog, 515 concurrent_stream_names: set[str], 516 ) -> ConfiguredAirbyteCatalog: 517 return ConfiguredAirbyteCatalog( 518 streams=[ 519 stream 520 for stream in catalog.streams 521 if stream.stream.name not in concurrent_stream_names 522 ] 523 ) 524 525 @staticmethod 526 def _migrate_state( 527 declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any] 528 ) -> MutableMapping[str, Any]: 529 for state_migration in declarative_stream.state_migrations: 530 if state_migration.should_migrate(stream_state): 531 # The state variable is expected to be mutable but the migrate method returns an immutable mapping. 532 stream_state = dict(state_migration.migrate(stream_state)) 533 534 return stream_state
class
ConcurrentDeclarativeSource(airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource, typing.Generic[~TState]):
64class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]): 65 # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock 66 # because it has hit the limit of futures but not partition reader is consuming them. 67 _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 68 69 def __init__( 70 self, 71 catalog: Optional[ConfiguredAirbyteCatalog], 72 config: Optional[Mapping[str, Any]], 73 state: TState, 74 source_config: ConnectionDefinition, 75 debug: bool = False, 76 emit_connector_builder_messages: bool = False, 77 component_factory: Optional[ModelToComponentFactory] = None, 78 config_path: Optional[str] = None, 79 **kwargs: Any, 80 ) -> None: 81 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 82 # no longer needs to store the original incoming state. But maybe there's an edge case? 83 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 84 85 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 86 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 87 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 88 # incremental streams running in full refresh. 89 component_factory = component_factory or ModelToComponentFactory( 90 emit_connector_builder_messages=emit_connector_builder_messages, 91 disable_resumable_full_refresh=True, 92 connector_state_manager=self._connector_state_manager, 93 ) 94 95 super().__init__( 96 source_config=source_config, 97 config=config, 98 debug=debug, 99 emit_connector_builder_messages=emit_connector_builder_messages, 100 component_factory=component_factory, 101 config_path=config_path, 102 ) 103 104 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 105 if concurrency_level_from_manifest: 106 concurrency_level_component = self._constructor.create_component( 107 model_type=ConcurrencyLevelModel, 108 component_definition=concurrency_level_from_manifest, 109 config=config or {}, 110 ) 111 if not isinstance(concurrency_level_component, ConcurrencyLevel): 112 raise ValueError( 113 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 114 ) 115 116 concurrency_level = concurrency_level_component.get_concurrency_level() 117 initial_number_of_partitions_to_generate = max( 118 concurrency_level // 2, 1 119 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 120 else: 121 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 122 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 123 124 self._concurrent_source = ConcurrentSource.create( 125 num_workers=concurrency_level, 126 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 127 logger=self.logger, 128 slice_logger=self._slice_logger, 129 message_repository=self.message_repository, 130 ) 131 132 # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state. 133 @property 134 def is_partially_declarative(self) -> bool: 135 """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.""" 136 return False 137 138 def read( 139 self, 140 logger: logging.Logger, 141 config: Mapping[str, Any], 142 catalog: ConfiguredAirbyteCatalog, 143 state: Optional[List[AirbyteStateMessage]] = None, 144 ) -> Iterator[AirbyteMessage]: 145 concurrent_streams, _ = self._group_streams(config=config) 146 147 # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of 148 # the concurrent streams must be saved so that they can be removed from the catalog before starting 149 # synchronous streams 150 if len(concurrent_streams) > 0: 151 concurrent_stream_names = set( 152 [concurrent_stream.name for concurrent_stream in concurrent_streams] 153 ) 154 155 selected_concurrent_streams = self._select_streams( 156 streams=concurrent_streams, configured_catalog=catalog 157 ) 158 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 159 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 160 if selected_concurrent_streams: 161 yield from self._concurrent_source.read(selected_concurrent_streams) 162 163 # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the 164 # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many 165 # of which were already synced using the Concurrent CDK 166 filtered_catalog = self._remove_concurrent_streams_from_catalog( 167 catalog=catalog, concurrent_stream_names=concurrent_stream_names 168 ) 169 else: 170 filtered_catalog = catalog 171 172 # It is no need run read for synchronous streams if they are not exists. 173 if not filtered_catalog.streams: 174 return 175 176 yield from super().read(logger, config, filtered_catalog, state) 177 178 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 179 concurrent_streams, synchronous_streams = self._group_streams(config=config) 180 return AirbyteCatalog( 181 streams=[ 182 stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams 183 ] 184 ) 185 186 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 187 """ 188 The `streams` method is used as part of the AbstractSource in the following cases: 189 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 190 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 191 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 192 193 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 194 """ 195 return super().streams(config) 196 197 def _group_streams( 198 self, config: Mapping[str, Any] 199 ) -> Tuple[List[AbstractStream], List[Stream]]: 200 concurrent_streams: List[AbstractStream] = [] 201 synchronous_streams: List[Stream] = [] 202 203 # Combine streams and dynamic_streams. Note: both cannot be empty at the same time, 204 # and this is validated during the initialization of the source. 205 streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs( 206 self._source_config, config 207 ) 208 209 name_to_stream_mapping = {stream["name"]: stream for stream in streams} 210 211 for declarative_stream in self.streams(config=config): 212 # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect 213 # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, 214 # so we need to treat them as synchronous 215 216 supports_file_transfer = ( 217 isinstance(declarative_stream, DeclarativeStream) 218 and "file_uploader" in name_to_stream_mapping[declarative_stream.name] 219 ) 220 221 if ( 222 isinstance(declarative_stream, DeclarativeStream) 223 and name_to_stream_mapping[declarative_stream.name]["type"] 224 == "StateDelegatingStream" 225 ): 226 stream_state = self._connector_state_manager.get_stream_state( 227 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 228 ) 229 230 name_to_stream_mapping[declarative_stream.name] = ( 231 name_to_stream_mapping[declarative_stream.name]["incremental_stream"] 232 if stream_state 233 else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"] 234 ) 235 236 if isinstance(declarative_stream, DeclarativeStream) and ( 237 name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 238 == "SimpleRetriever" 239 or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] 240 == "AsyncRetriever" 241 ): 242 incremental_sync_component_definition = name_to_stream_mapping[ 243 declarative_stream.name 244 ].get("incremental_sync") 245 246 partition_router_component_definition = ( 247 name_to_stream_mapping[declarative_stream.name] 248 .get("retriever", {}) 249 .get("partition_router") 250 ) 251 is_without_partition_router_or_cursor = not bool( 252 incremental_sync_component_definition 253 ) and not bool(partition_router_component_definition) 254 255 is_substream_without_incremental = ( 256 partition_router_component_definition 257 and not incremental_sync_component_definition 258 ) 259 260 if self._is_concurrent_cursor_incremental_without_partition_routing( 261 declarative_stream, incremental_sync_component_definition 262 ): 263 stream_state = self._connector_state_manager.get_stream_state( 264 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 265 ) 266 stream_state = self._migrate_state(declarative_stream, stream_state) 267 268 retriever = self._get_retriever(declarative_stream, stream_state) 269 270 if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance( 271 declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter 272 ): 273 cursor = declarative_stream.retriever.stream_slicer.stream_slicer 274 275 if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor): 276 # This should never happen since we instantiate ConcurrentCursor in 277 # model_to_component_factory.py 278 raise ValueError( 279 f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}" 280 ) 281 282 partition_generator = StreamSlicerPartitionGenerator( 283 partition_factory=DeclarativePartitionFactory( 284 declarative_stream.name, 285 declarative_stream.get_json_schema(), 286 retriever, 287 self.message_repository, 288 ), 289 stream_slicer=declarative_stream.retriever.stream_slicer, 290 ) 291 else: 292 if ( 293 incremental_sync_component_definition 294 and incremental_sync_component_definition.get("type") 295 == IncrementingCountCursorModel.__name__ 296 ): 297 cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor( 298 model_type=IncrementingCountCursorModel, 299 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 300 stream_name=declarative_stream.name, 301 stream_namespace=declarative_stream.namespace, 302 config=config or {}, 303 ) 304 else: 305 cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( 306 model_type=DatetimeBasedCursorModel, 307 component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above 308 stream_name=declarative_stream.name, 309 stream_namespace=declarative_stream.namespace, 310 config=config or {}, 311 stream_state_migrations=declarative_stream.state_migrations, 312 ) 313 partition_generator = StreamSlicerPartitionGenerator( 314 partition_factory=DeclarativePartitionFactory( 315 declarative_stream.name, 316 declarative_stream.get_json_schema(), 317 retriever, 318 self.message_repository, 319 ), 320 stream_slicer=cursor, 321 ) 322 323 concurrent_streams.append( 324 DefaultStream( 325 partition_generator=partition_generator, 326 name=declarative_stream.name, 327 json_schema=declarative_stream.get_json_schema(), 328 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 329 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 330 cursor_field=cursor.cursor_field.cursor_field_key 331 if hasattr(cursor, "cursor_field") 332 and hasattr( 333 cursor.cursor_field, "cursor_field_key" 334 ) # FIXME this will need to be updated once we do the per partition 335 else None, 336 logger=self.logger, 337 cursor=cursor, 338 supports_file_transfer=supports_file_transfer, 339 ) 340 ) 341 elif ( 342 is_substream_without_incremental or is_without_partition_router_or_cursor 343 ) and hasattr(declarative_stream.retriever, "stream_slicer"): 344 partition_generator = StreamSlicerPartitionGenerator( 345 DeclarativePartitionFactory( 346 declarative_stream.name, 347 declarative_stream.get_json_schema(), 348 declarative_stream.retriever, 349 self.message_repository, 350 ), 351 declarative_stream.retriever.stream_slicer, 352 ) 353 354 final_state_cursor = FinalStateCursor( 355 stream_name=declarative_stream.name, 356 stream_namespace=declarative_stream.namespace, 357 message_repository=self.message_repository, 358 ) 359 360 concurrent_streams.append( 361 DefaultStream( 362 partition_generator=partition_generator, 363 name=declarative_stream.name, 364 json_schema=declarative_stream.get_json_schema(), 365 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 366 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 367 cursor_field=None, 368 logger=self.logger, 369 cursor=final_state_cursor, 370 supports_file_transfer=supports_file_transfer, 371 ) 372 ) 373 elif ( 374 incremental_sync_component_definition 375 and incremental_sync_component_definition.get("type", "") 376 == DatetimeBasedCursorModel.__name__ 377 and hasattr(declarative_stream.retriever, "stream_slicer") 378 and isinstance( 379 declarative_stream.retriever.stream_slicer, 380 (GlobalSubstreamCursor, PerPartitionWithGlobalCursor), 381 ) 382 ): 383 stream_state = self._connector_state_manager.get_stream_state( 384 stream_name=declarative_stream.name, namespace=declarative_stream.namespace 385 ) 386 stream_state = self._migrate_state(declarative_stream, stream_state) 387 388 partition_router = declarative_stream.retriever.stream_slicer._partition_router 389 390 perpartition_cursor = ( 391 self._constructor.create_concurrent_cursor_from_perpartition_cursor( 392 state_manager=self._connector_state_manager, 393 model_type=DatetimeBasedCursorModel, 394 component_definition=incremental_sync_component_definition, 395 stream_name=declarative_stream.name, 396 stream_namespace=declarative_stream.namespace, 397 config=config or {}, 398 stream_state=stream_state, 399 partition_router=partition_router, 400 ) 401 ) 402 403 retriever = self._get_retriever(declarative_stream, stream_state) 404 405 partition_generator = StreamSlicerPartitionGenerator( 406 DeclarativePartitionFactory( 407 declarative_stream.name, 408 declarative_stream.get_json_schema(), 409 retriever, 410 self.message_repository, 411 ), 412 perpartition_cursor, 413 ) 414 415 concurrent_streams.append( 416 DefaultStream( 417 partition_generator=partition_generator, 418 name=declarative_stream.name, 419 json_schema=declarative_stream.get_json_schema(), 420 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 421 primary_key=get_primary_key_from_stream(declarative_stream.primary_key), 422 cursor_field=perpartition_cursor.cursor_field.cursor_field_key, 423 logger=self.logger, 424 cursor=perpartition_cursor, 425 supports_file_transfer=supports_file_transfer, 426 ) 427 ) 428 else: 429 synchronous_streams.append(declarative_stream) 430 # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state. 431 # Condition below needs to ensure that concurrent support is not lost for sources that already support 432 # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe). 433 elif ( 434 isinstance(declarative_stream, AbstractStreamFacade) 435 and self.is_partially_declarative 436 ): 437 concurrent_streams.append(declarative_stream.get_underlying_stream()) 438 else: 439 synchronous_streams.append(declarative_stream) 440 441 return concurrent_streams, synchronous_streams 442 443 def _is_concurrent_cursor_incremental_without_partition_routing( 444 self, 445 declarative_stream: DeclarativeStream, 446 incremental_sync_component_definition: Mapping[str, Any] | None, 447 ) -> bool: 448 return ( 449 incremental_sync_component_definition is not None 450 and bool(incremental_sync_component_definition) 451 and ( 452 incremental_sync_component_definition.get("type", "") 453 in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__) 454 ) 455 and hasattr(declarative_stream.retriever, "stream_slicer") 456 and ( 457 isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) 458 # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor 459 # add isintance check here if we want to create a Declarative IncrementingCountCursor 460 # or isinstance( 461 # declarative_stream.retriever.stream_slicer, IncrementingCountCursor 462 # ) 463 or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter) 464 ) 465 ) 466 467 @staticmethod 468 def _get_retriever( 469 declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any] 470 ) -> Retriever: 471 retriever = declarative_stream.retriever 472 473 # This is an optimization so that we don't invoke any cursor or state management flows within the 474 # low-code framework because state management is handled through the ConcurrentCursor. 475 if declarative_stream and isinstance(retriever, SimpleRetriever): 476 # Also a temporary hack. In the legacy Stream implementation, as part of the read, 477 # set_initial_state() is called to instantiate incoming state on the cursor. Although we no 478 # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components 479 # like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is 480 # properly initialized with state. 481 if retriever.cursor: 482 retriever.cursor.set_initial_state(stream_state=stream_state) 483 484 # Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance 485 # from the one initialized on the SimpleRetriever, so it also must also have state initialized 486 # for semi-incremental streams using is_client_side_incremental to filter properly 487 if isinstance(retriever.record_selector, RecordSelector) and isinstance( 488 retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator 489 ): 490 retriever.record_selector.record_filter._cursor.set_initial_state( 491 stream_state=stream_state 492 ) # type: ignore # After non-concurrent cursors are deprecated we can remove these cursor workarounds 493 494 # We zero it out here, but since this is a cursor reference, the state is still properly 495 # instantiated for the other components that reference it 496 retriever.cursor = None 497 498 return retriever 499 500 @staticmethod 501 def _select_streams( 502 streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog 503 ) -> List[AbstractStream]: 504 stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} 505 abstract_streams: List[AbstractStream] = [] 506 for configured_stream in configured_catalog.streams: 507 stream_instance = stream_name_to_instance.get(configured_stream.stream.name) 508 if stream_instance: 509 abstract_streams.append(stream_instance) 510 511 return abstract_streams 512 513 @staticmethod 514 def _remove_concurrent_streams_from_catalog( 515 catalog: ConfiguredAirbyteCatalog, 516 concurrent_stream_names: set[str], 517 ) -> ConfiguredAirbyteCatalog: 518 return ConfiguredAirbyteCatalog( 519 streams=[ 520 stream 521 for stream in catalog.streams 522 if stream.stream.name not in concurrent_stream_names 523 ] 524 ) 525 526 @staticmethod 527 def _migrate_state( 528 declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any] 529 ) -> MutableMapping[str, Any]: 530 for state_migration in declarative_stream.state_migrations: 531 if state_migration.should_migrate(stream_state): 532 # The state variable is expected to be mutable but the migrate method returns an immutable mapping. 533 stream_state = dict(state_migration.migrate(stream_state)) 534 535 return stream_state
Declarative source defined by a manifest of low-code components that define source connector behavior
is_partially_declarative: bool
133 @property 134 def is_partially_declarative(self) -> bool: 135 """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.""" 136 return False
This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.
def
discover( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteCatalog:
178 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 179 concurrent_streams, synchronous_streams = self._group_streams(config=config) 180 return AirbyteCatalog( 181 streams=[ 182 stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams 183 ] 184 )
Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.