airbyte_cdk.sources.declarative.expanders

1#
2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3#
4
5from airbyte_cdk.sources.declarative.expanders.record_expander import OnNoRecords, RecordExpander
6
7__all__ = ["OnNoRecords", "RecordExpander"]
class OnNoRecords(enum.Enum):
17class OnNoRecords(Enum):
18    """
19    Behavior when record expansion produces no records.
20    """
21
22    skip = "skip"
23    emit_parent = "emit_parent"

Behavior when record expansion produces no records.

skip = <OnNoRecords.skip: 'skip'>
emit_parent = <OnNoRecords.emit_parent: 'emit_parent'>
@dataclass
class RecordExpander:
 26@dataclass
 27class RecordExpander:
 28    """Expands records by extracting items from a nested array field.
 29
 30    When configured, this component extracts items from a specified nested array path
 31    within each record and emits each item as a separate record. Set `remain_original_record: true`
 32    to embed the full parent record under `original_record` in each expanded item when you need
 33    downstream transformations to access parent context.
 34
 35    The expand_records_from_field path supports wildcards (*) for matching multiple arrays.
 36    When wildcards are used, items from all matched arrays are extracted and emitted.
 37
 38    Examples of instantiating this component:
 39    ```
 40      record_expander:
 41        type: RecordExpander
 42        expand_records_from_field:
 43          - "lines"
 44          - "data"
 45        remain_original_record: true
 46    ```
 47
 48    ```
 49      record_expander:
 50        type: RecordExpander
 51        expand_records_from_field:
 52          - "sections"
 53          - "*"
 54          - "items"
 55        on_no_records: emit_parent
 56    ```
 57
 58    Attributes:
 59        expand_records_from_field: Path to a nested array field within each record.
 60            Items from this array will be extracted and emitted as separate records.
 61            Supports wildcards (*).
 62        remain_original_record: If True, each expanded record will include the original
 63            parent record in an "original_record" field. Defaults to False.
 64        on_no_records: Behavior when expansion produces no records. "skip" (default)
 65            emits nothing. "emit_parent" emits the original parent record unchanged.
 66        config: The user-provided configuration as specified by the source's spec.
 67    """
 68
 69    expand_records_from_field: Sequence[str]
 70    config: Config
 71    parameters: InitVar[Mapping[str, Any]]
 72    remain_original_record: bool = False
 73    on_no_records: OnNoRecords = OnNoRecords.skip
 74
 75    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 76        self._expand_path: list[InterpolatedString] = [
 77            InterpolatedString.create(path, parameters=parameters)
 78            for path in self.expand_records_from_field
 79        ]
 80
 81    def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]:
 82        """Expand a record by extracting items from a nested array field."""
 83        if not isinstance(record, Mapping):
 84            # If the input isn't a mapping, expansion can't proceed; yield as-is.
 85            yield record
 86            return
 87
 88        if not self._expand_path:
 89            yield record
 90            return
 91
 92        parent_record = record
 93        expand_path = [path.eval(self.config) for path in self._expand_path]
 94        expanded_any = False
 95
 96        try:
 97            extracted_values = dpath.values(parent_record, expand_path)
 98        except KeyError:
 99            extracted_values = []
100
101        for extracted in extracted_values:
102            if not isinstance(extracted, list):
103                continue
104            items = extracted
105            for item in items:
106                if isinstance(item, dict):
107                    expanded_record = dict(item)
108                    self._apply_parent_context(parent_record, expanded_record)
109                    yield expanded_record
110                    expanded_any = True
111                else:
112                    if self.remain_original_record:
113                        yield {
114                            "value": item,
115                            "original_record": copy.deepcopy(parent_record),
116                        }
117                    else:
118                        yield item
119                    expanded_any = True
120
121        if not expanded_any and self.on_no_records == OnNoRecords.emit_parent:
122            yield parent_record
123
124    def _apply_parent_context(
125        self, parent_record: Mapping[str, Any], child_record: MutableMapping[str, Any]
126    ) -> None:
127        """Apply parent context to a child record."""
128        if self.remain_original_record:
129            child_record["original_record"] = copy.deepcopy(parent_record)

Expands records by extracting items from a nested array field.

When configured, this component extracts items from a specified nested array path within each record and emits each item as a separate record. Set remain_original_record: true to embed the full parent record under original_record in each expanded item when you need downstream transformations to access parent context.

The expand_records_from_field path supports wildcards (*) for matching multiple arrays. When wildcards are used, items from all matched arrays are extracted and emitted.

Examples of instantiating this component:

  record_expander:
    type: RecordExpander
    expand_records_from_field:
      - "lines"
      - "data"
    remain_original_record: true
  record_expander:
    type: RecordExpander
    expand_records_from_field:
      - "sections"
      - "*"
      - "items"
    on_no_records: emit_parent
Attributes:
  • expand_records_from_field: Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*).
  • remain_original_record: If True, each expanded record will include the original parent record in an "original_record" field. Defaults to False.
  • on_no_records: Behavior when expansion produces no records. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged.
  • config: The user-provided configuration as specified by the source's spec.
RecordExpander( expand_records_from_field: Sequence[str], config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], remain_original_record: bool = False, on_no_records: OnNoRecords = <OnNoRecords.skip: 'skip'>)
expand_records_from_field: Sequence[str]
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
remain_original_record: bool = False
on_no_records: OnNoRecords = <OnNoRecords.skip: 'skip'>
def expand_record( self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]:
 81    def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]:
 82        """Expand a record by extracting items from a nested array field."""
 83        if not isinstance(record, Mapping):
 84            # If the input isn't a mapping, expansion can't proceed; yield as-is.
 85            yield record
 86            return
 87
 88        if not self._expand_path:
 89            yield record
 90            return
 91
 92        parent_record = record
 93        expand_path = [path.eval(self.config) for path in self._expand_path]
 94        expanded_any = False
 95
 96        try:
 97            extracted_values = dpath.values(parent_record, expand_path)
 98        except KeyError:
 99            extracted_values = []
100
101        for extracted in extracted_values:
102            if not isinstance(extracted, list):
103                continue
104            items = extracted
105            for item in items:
106                if isinstance(item, dict):
107                    expanded_record = dict(item)
108                    self._apply_parent_context(parent_record, expanded_record)
109                    yield expanded_record
110                    expanded_any = True
111                else:
112                    if self.remain_original_record:
113                        yield {
114                            "value": item,
115                            "original_record": copy.deepcopy(parent_record),
116                        }
117                    else:
118                        yield item
119                    expanded_any = True
120
121        if not expanded_any and self.on_no_records == OnNoRecords.emit_parent:
122            yield parent_record

Expand a record by extracting items from a nested array field.