airbyte_cdk.sources.streams.concurrent.partition_reader

 1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
 2
 3import logging
 4from queue import Queue
 5from typing import Optional
 6
 7from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
 8from airbyte_cdk.sources.message.repository import MessageRepository
 9from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
10from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
11from airbyte_cdk.sources.streams.concurrent.partitions.types import (
12    PartitionCompleteSentinel,
13    QueueItem,
14)
15from airbyte_cdk.sources.utils.slice_logger import SliceLogger
16
17
18# Since moving all the connector builder workflow to the concurrent CDK which required correct ordering
19# of grouping log messages onto the main write thread using the ConcurrentMessageRepository, this
20# separate flow and class that was used to log slices onto this partition's message_repository
21# should just be replaced by emitting messages directly onto the repository instead of an intermediary.
22class PartitionLogger:
23    """
24    Helper class that provides a mechanism for passing a log message onto the current
25    partitions message repository
26    """
27
28    def __init__(
29        self,
30        slice_logger: SliceLogger,
31        logger: logging.Logger,
32        message_repository: MessageRepository,
33    ):
34        self._slice_logger = slice_logger
35        self._logger = logger
36        self._message_repository = message_repository
37
38    def log(self, partition: Partition) -> None:
39        if self._slice_logger.should_log_slice_message(self._logger):
40            self._message_repository.emit_message(
41                self._slice_logger.create_slice_log_message(partition.to_slice())
42            )
43
44
45class PartitionReader:
46    """
47    Generates records from a partition and puts them in a queue.
48    """
49
50    _IS_SUCCESSFUL = True
51
52    def __init__(
53        self,
54        queue: Queue[QueueItem],
55        partition_logger: Optional[PartitionLogger] = None,
56    ) -> None:
57        """
58        :param queue: The queue to put the records in.
59        """
60        self._queue = queue
61        self._partition_logger = partition_logger
62
63    def process_partition(self, partition: Partition, cursor: Cursor) -> None:
64        """
65        Process a partition and put the records in the output queue.
66        When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated.
67
68        If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the
69        main thread will have no way to know that something when wrong and will wait until the timeout is reached
70
71        This method is meant to be called from a thread.
72        :param partition: The partition to read data from
73        :return: None
74        """
75        try:
76            if self._partition_logger:
77                self._partition_logger.log(partition)
78
79            for record in partition.read():
80                self._queue.put(record)
81                cursor.observe(record)
82            cursor.close_partition(partition)
83            self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL))
84        except Exception as e:
85            self._queue.put(StreamThreadException(e, partition.stream_name()))
86            self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL))
class PartitionLogger:
23class PartitionLogger:
24    """
25    Helper class that provides a mechanism for passing a log message onto the current
26    partitions message repository
27    """
28
29    def __init__(
30        self,
31        slice_logger: SliceLogger,
32        logger: logging.Logger,
33        message_repository: MessageRepository,
34    ):
35        self._slice_logger = slice_logger
36        self._logger = logger
37        self._message_repository = message_repository
38
39    def log(self, partition: Partition) -> None:
40        if self._slice_logger.should_log_slice_message(self._logger):
41            self._message_repository.emit_message(
42                self._slice_logger.create_slice_log_message(partition.to_slice())
43            )

Helper class that provides a mechanism for passing a log message onto the current partitions message repository

PartitionLogger( slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, logger: logging.Logger, message_repository: airbyte_cdk.MessageRepository)
29    def __init__(
30        self,
31        slice_logger: SliceLogger,
32        logger: logging.Logger,
33        message_repository: MessageRepository,
34    ):
35        self._slice_logger = slice_logger
36        self._logger = logger
37        self._message_repository = message_repository
def log( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
39    def log(self, partition: Partition) -> None:
40        if self._slice_logger.should_log_slice_message(self._logger):
41            self._message_repository.emit_message(
42                self._slice_logger.create_slice_log_message(partition.to_slice())
43            )
class PartitionReader:
46class PartitionReader:
47    """
48    Generates records from a partition and puts them in a queue.
49    """
50
51    _IS_SUCCESSFUL = True
52
53    def __init__(
54        self,
55        queue: Queue[QueueItem],
56        partition_logger: Optional[PartitionLogger] = None,
57    ) -> None:
58        """
59        :param queue: The queue to put the records in.
60        """
61        self._queue = queue
62        self._partition_logger = partition_logger
63
64    def process_partition(self, partition: Partition, cursor: Cursor) -> None:
65        """
66        Process a partition and put the records in the output queue.
67        When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated.
68
69        If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the
70        main thread will have no way to know that something when wrong and will wait until the timeout is reached
71
72        This method is meant to be called from a thread.
73        :param partition: The partition to read data from
74        :return: None
75        """
76        try:
77            if self._partition_logger:
78                self._partition_logger.log(partition)
79
80            for record in partition.read():
81                self._queue.put(record)
82                cursor.observe(record)
83            cursor.close_partition(partition)
84            self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL))
85        except Exception as e:
86            self._queue.put(StreamThreadException(e, partition.stream_name()))
87            self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL))

Generates records from a partition and puts them in a queue.

53    def __init__(
54        self,
55        queue: Queue[QueueItem],
56        partition_logger: Optional[PartitionLogger] = None,
57    ) -> None:
58        """
59        :param queue: The queue to put the records in.
60        """
61        self._queue = queue
62        self._partition_logger = partition_logger
Parameters
  • queue: The queue to put the records in.
def process_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition, cursor: airbyte_cdk.Cursor) -> None:
64    def process_partition(self, partition: Partition, cursor: Cursor) -> None:
65        """
66        Process a partition and put the records in the output queue.
67        When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated.
68
69        If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the
70        main thread will have no way to know that something when wrong and will wait until the timeout is reached
71
72        This method is meant to be called from a thread.
73        :param partition: The partition to read data from
74        :return: None
75        """
76        try:
77            if self._partition_logger:
78                self._partition_logger.log(partition)
79
80            for record in partition.read():
81                self._queue.put(record)
82                cursor.observe(record)
83            cursor.close_partition(partition)
84            self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL))
85        except Exception as e:
86            self._queue.put(StreamThreadException(e, partition.stream_name()))
87            self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL))

Process a partition and put the records in the output queue. When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated.

If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the main thread will have no way to know that something when wrong and will wait until the timeout is reached

This method is meant to be called from a thread.

Parameters
  • partition: The partition to read data from
Returns

None