airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2
 3from abc import ABC, ABCMeta, abstractmethod
 4from typing import Any, Iterable
 5
 6from airbyte_cdk.sources.types import StreamSlice
 7
 8
 9class StreamSlicerMeta(ABCMeta):
10    """
11    Metaclass for wrapper scenario that allows it to be used as a type check for StreamSlicer.
12    This is necessary because StreamSlicerTestReadDecorator wraps a StreamSlicer and we want to be able to check
13    if an instance is a StreamSlicer, even if it is wrapped in a StreamSlicerTestReadDecorator.
14
15    For example in ConcurrentDeclarativeSource, we do things like:
16        isinstance(declarative_stream.retriever.stream_slicer,(GlobalSubstreamCursor, PerPartitionWithGlobalCursor))
17    """
18
19    def __instancecheck__(cls, instance: Any) -> bool:
20        # Check if it's our wrapper with matching wrapped class
21        if hasattr(instance, "wrapped_slicer"):
22            return isinstance(instance.wrapped_slicer, cls)
23
24        return super().__instancecheck__(instance)
25
26
27class StreamSlicer(ABC, metaclass=StreamSlicerMeta):
28    """
29    Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.
30    """
31
32    @abstractmethod
33    def stream_slices(self) -> Iterable[StreamSlice]:
34        """
35        Defines stream slices
36
37        :return: An iterable of stream slices
38        """
39        pass
class StreamSlicerMeta(abc.ABCMeta):
10class StreamSlicerMeta(ABCMeta):
11    """
12    Metaclass for wrapper scenario that allows it to be used as a type check for StreamSlicer.
13    This is necessary because StreamSlicerTestReadDecorator wraps a StreamSlicer and we want to be able to check
14    if an instance is a StreamSlicer, even if it is wrapped in a StreamSlicerTestReadDecorator.
15
16    For example in ConcurrentDeclarativeSource, we do things like:
17        isinstance(declarative_stream.retriever.stream_slicer,(GlobalSubstreamCursor, PerPartitionWithGlobalCursor))
18    """
19
20    def __instancecheck__(cls, instance: Any) -> bool:
21        # Check if it's our wrapper with matching wrapped class
22        if hasattr(instance, "wrapped_slicer"):
23            return isinstance(instance.wrapped_slicer, cls)
24
25        return super().__instancecheck__(instance)

Metaclass for wrapper scenario that allows it to be used as a type check for StreamSlicer. This is necessary because StreamSlicerTestReadDecorator wraps a StreamSlicer and we want to be able to check if an instance is a StreamSlicer, even if it is wrapped in a StreamSlicerTestReadDecorator.

For example in ConcurrentDeclarativeSource, we do things like: isinstance(declarative_stream.retriever.stream_slicer,(GlobalSubstreamCursor, PerPartitionWithGlobalCursor))

class StreamSlicer(abc.ABC):
28class StreamSlicer(ABC, metaclass=StreamSlicerMeta):
29    """
30    Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.
31    """
32
33    @abstractmethod
34    def stream_slices(self) -> Iterable[StreamSlice]:
35        """
36        Defines stream slices
37
38        :return: An iterable of stream slices
39        """
40        pass

Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.

@abstractmethod
def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
33    @abstractmethod
34    def stream_slices(self) -> Iterable[StreamSlice]:
35        """
36        Defines stream slices
37
38        :return: An iterable of stream slices
39        """
40        pass

Defines stream slices

Returns

An iterable of stream slices