airbyte_cdk.sources.streams.concurrent.partition_reader

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4from queue import Queue
 5
 6from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
 7from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 8from airbyte_cdk.sources.streams.concurrent.partitions.types import (
 9    PartitionCompleteSentinel,
10    QueueItem,
11)
12
13
14class PartitionReader:
15    """
16    Generates records from a partition and puts them in a queue.
17    """
18
19    _IS_SUCCESSFUL = True
20
21    def __init__(self, queue: Queue[QueueItem]) -> None:
22        """
23        :param queue: The queue to put the records in.
24        """
25        self._queue = queue
26
27    def process_partition(self, partition: Partition) -> None:
28        """
29        Process a partition and put the records in the output queue.
30        When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated.
31
32        If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the
33        main thread will have no way to know that something when wrong and will wait until the timeout is reached
34
35        This method is meant to be called from a thread.
36        :param partition: The partition to read data from
37        :return: None
38        """
39        try:
40            for record in partition.read():
41                self._queue.put(record)
42            self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL))
43        except Exception as e:
44            self._queue.put(StreamThreadException(e, partition.stream_name()))
45            self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL))
class PartitionReader:
15class PartitionReader:
16    """
17    Generates records from a partition and puts them in a queue.
18    """
19
20    _IS_SUCCESSFUL = True
21
22    def __init__(self, queue: Queue[QueueItem]) -> None:
23        """
24        :param queue: The queue to put the records in.
25        """
26        self._queue = queue
27
28    def process_partition(self, partition: Partition) -> None:
29        """
30        Process a partition and put the records in the output queue.
31        When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated.
32
33        If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the
34        main thread will have no way to know that something when wrong and will wait until the timeout is reached
35
36        This method is meant to be called from a thread.
37        :param partition: The partition to read data from
38        :return: None
39        """
40        try:
41            for record in partition.read():
42                self._queue.put(record)
43            self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL))
44        except Exception as e:
45            self._queue.put(StreamThreadException(e, partition.stream_name()))
46            self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL))

Generates records from a partition and puts them in a queue.

22    def __init__(self, queue: Queue[QueueItem]) -> None:
23        """
24        :param queue: The queue to put the records in.
25        """
26        self._queue = queue
Parameters
  • queue: The queue to put the records in.
def process_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
28    def process_partition(self, partition: Partition) -> None:
29        """
30        Process a partition and put the records in the output queue.
31        When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated.
32
33        If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the
34        main thread will have no way to know that something when wrong and will wait until the timeout is reached
35
36        This method is meant to be called from a thread.
37        :param partition: The partition to read data from
38        :return: None
39        """
40        try:
41            for record in partition.read():
42                self._queue.put(record)
43            self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL))
44        except Exception as e:
45            self._queue.put(StreamThreadException(e, partition.stream_name()))
46            self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL))

Process a partition and put the records in the output queue. When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated.

If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the main thread will have no way to know that something when wrong and will wait until the timeout is reached

This method is meant to be called from a thread.

Parameters
  • partition: The partition to read data from
Returns

None