airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2 3from abc import ABC, ABCMeta, abstractmethod 4from typing import Any, Iterable 5 6from airbyte_cdk.sources.types import StreamSlice 7 8 9class StreamSlicerMeta(ABCMeta): 10 """ 11 Metaclass for wrapper scenario that allows it to be used as a type check for StreamSlicer. 12 This is necessary because StreamSlicerTestReadDecorator wraps a StreamSlicer and we want to be able to check 13 if an instance is a StreamSlicer, even if it is wrapped in a StreamSlicerTestReadDecorator. 14 15 For example in ConcurrentDeclarativeSource, we do things like: 16 isinstance(declarative_stream.retriever.stream_slicer,(GlobalSubstreamCursor, PerPartitionWithGlobalCursor)) 17 """ 18 19 def __instancecheck__(cls, instance: Any) -> bool: 20 # Check if it's our wrapper with matching wrapped class 21 if hasattr(instance, "wrapped_slicer"): 22 return isinstance(instance.wrapped_slicer, cls) 23 24 return super().__instancecheck__(instance) 25 26 27class StreamSlicer(ABC, metaclass=StreamSlicerMeta): 28 """ 29 Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization. 30 """ 31 32 @abstractmethod 33 def stream_slices(self) -> Iterable[StreamSlice]: 34 """ 35 Defines stream slices 36 37 :return: An iterable of stream slices 38 """ 39 pass
class
StreamSlicerMeta(abc.ABCMeta):
10class StreamSlicerMeta(ABCMeta): 11 """ 12 Metaclass for wrapper scenario that allows it to be used as a type check for StreamSlicer. 13 This is necessary because StreamSlicerTestReadDecorator wraps a StreamSlicer and we want to be able to check 14 if an instance is a StreamSlicer, even if it is wrapped in a StreamSlicerTestReadDecorator. 15 16 For example in ConcurrentDeclarativeSource, we do things like: 17 isinstance(declarative_stream.retriever.stream_slicer,(GlobalSubstreamCursor, PerPartitionWithGlobalCursor)) 18 """ 19 20 def __instancecheck__(cls, instance: Any) -> bool: 21 # Check if it's our wrapper with matching wrapped class 22 if hasattr(instance, "wrapped_slicer"): 23 return isinstance(instance.wrapped_slicer, cls) 24 25 return super().__instancecheck__(instance)
Metaclass for wrapper scenario that allows it to be used as a type check for StreamSlicer. This is necessary because StreamSlicerTestReadDecorator wraps a StreamSlicer and we want to be able to check if an instance is a StreamSlicer, even if it is wrapped in a StreamSlicerTestReadDecorator.
For example in ConcurrentDeclarativeSource, we do things like: isinstance(declarative_stream.retriever.stream_slicer,(GlobalSubstreamCursor, PerPartitionWithGlobalCursor))
class
StreamSlicer(abc.ABC):
28class StreamSlicer(ABC, metaclass=StreamSlicerMeta): 29 """ 30 Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization. 31 """ 32 33 @abstractmethod 34 def stream_slices(self) -> Iterable[StreamSlice]: 35 """ 36 Defines stream slices 37 38 :return: An iterable of stream slices 39 """ 40 pass
Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.