airbyte_cdk.sources.streams.concurrent.partition_reader
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4from queue import Queue 5 6from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException 7from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 8from airbyte_cdk.sources.streams.concurrent.partitions.types import ( 9 PartitionCompleteSentinel, 10 QueueItem, 11) 12 13 14class PartitionReader: 15 """ 16 Generates records from a partition and puts them in a queue. 17 """ 18 19 _IS_SUCCESSFUL = True 20 21 def __init__(self, queue: Queue[QueueItem]) -> None: 22 """ 23 :param queue: The queue to put the records in. 24 """ 25 self._queue = queue 26 27 def process_partition(self, partition: Partition) -> None: 28 """ 29 Process a partition and put the records in the output queue. 30 When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated. 31 32 If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the 33 main thread will have no way to know that something when wrong and will wait until the timeout is reached 34 35 This method is meant to be called from a thread. 36 :param partition: The partition to read data from 37 :return: None 38 """ 39 try: 40 for record in partition.read(): 41 self._queue.put(record) 42 self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL)) 43 except Exception as e: 44 self._queue.put(StreamThreadException(e, partition.stream_name())) 45 self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL))
class
PartitionReader:
15class PartitionReader: 16 """ 17 Generates records from a partition and puts them in a queue. 18 """ 19 20 _IS_SUCCESSFUL = True 21 22 def __init__(self, queue: Queue[QueueItem]) -> None: 23 """ 24 :param queue: The queue to put the records in. 25 """ 26 self._queue = queue 27 28 def process_partition(self, partition: Partition) -> None: 29 """ 30 Process a partition and put the records in the output queue. 31 When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated. 32 33 If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the 34 main thread will have no way to know that something when wrong and will wait until the timeout is reached 35 36 This method is meant to be called from a thread. 37 :param partition: The partition to read data from 38 :return: None 39 """ 40 try: 41 for record in partition.read(): 42 self._queue.put(record) 43 self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL)) 44 except Exception as e: 45 self._queue.put(StreamThreadException(e, partition.stream_name())) 46 self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL))
Generates records from a partition and puts them in a queue.
PartitionReader( queue: queue.Queue[typing.Union[airbyte_cdk.Record, airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition, airbyte_cdk.sources.streams.concurrent.partitions.types.PartitionCompleteSentinel, airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel.PartitionGenerationCompletedSentinel, Exception]])
22 def __init__(self, queue: Queue[QueueItem]) -> None: 23 """ 24 :param queue: The queue to put the records in. 25 """ 26 self._queue = queue
Parameters
- queue: The queue to put the records in.
def
process_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
28 def process_partition(self, partition: Partition) -> None: 29 """ 30 Process a partition and put the records in the output queue. 31 When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated. 32 33 If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the 34 main thread will have no way to know that something when wrong and will wait until the timeout is reached 35 36 This method is meant to be called from a thread. 37 :param partition: The partition to read data from 38 :return: None 39 """ 40 try: 41 for record in partition.read(): 42 self._queue.put(record) 43 self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL)) 44 except Exception as e: 45 self._queue.put(StreamThreadException(e, partition.stream_name())) 46 self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL))
Process a partition and put the records in the output queue. When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated.
If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the main thread will have no way to know that something when wrong and will wait until the timeout is reached
This method is meant to be called from a thread.
Parameters
- partition: The partition to read data from
Returns
None