airbyte_cdk.sources.declarative.expanders
class
OnNoRecords(enum.Enum):
17class OnNoRecords(Enum): 18 """ 19 Behavior when record expansion produces no records. 20 """ 21 22 skip = "skip" 23 emit_parent = "emit_parent"
Behavior when record expansion produces no records.
skip =
<OnNoRecords.skip: 'skip'>
emit_parent =
<OnNoRecords.emit_parent: 'emit_parent'>
@dataclass
class
RecordExpander:
26@dataclass 27class RecordExpander: 28 """Expands records by extracting items from a nested array field. 29 30 When configured, this component extracts items from a specified nested array path 31 within each record and emits each item as a separate record. Set `remain_original_record: true` 32 to embed the full parent record under `original_record` in each expanded item when you need 33 downstream transformations to access parent context. 34 35 The expand_records_from_field path supports wildcards (*) for matching multiple arrays. 36 When wildcards are used, items from all matched arrays are extracted and emitted. 37 38 Examples of instantiating this component: 39 ``` 40 record_expander: 41 type: RecordExpander 42 expand_records_from_field: 43 - "lines" 44 - "data" 45 remain_original_record: true 46 ``` 47 48 ``` 49 record_expander: 50 type: RecordExpander 51 expand_records_from_field: 52 - "sections" 53 - "*" 54 - "items" 55 on_no_records: emit_parent 56 ``` 57 58 Attributes: 59 expand_records_from_field: Path to a nested array field within each record. 60 Items from this array will be extracted and emitted as separate records. 61 Supports wildcards (*). 62 remain_original_record: If True, each expanded record will include the original 63 parent record in an "original_record" field. Defaults to False. 64 on_no_records: Behavior when expansion produces no records. "skip" (default) 65 emits nothing. "emit_parent" emits the original parent record unchanged. 66 config: The user-provided configuration as specified by the source's spec. 67 """ 68 69 expand_records_from_field: Sequence[str] 70 config: Config 71 parameters: InitVar[Mapping[str, Any]] 72 remain_original_record: bool = False 73 on_no_records: OnNoRecords = OnNoRecords.skip 74 75 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 76 self._expand_path: list[InterpolatedString] = [ 77 InterpolatedString.create(path, parameters=parameters) 78 for path in self.expand_records_from_field 79 ] 80 81 def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]: 82 """Expand a record by extracting items from a nested array field.""" 83 if not isinstance(record, Mapping): 84 # If the input isn't a mapping, expansion can't proceed; yield as-is. 85 yield record 86 return 87 88 if not self._expand_path: 89 yield record 90 return 91 92 parent_record = record 93 expand_path = [path.eval(self.config) for path in self._expand_path] 94 expanded_any = False 95 96 try: 97 extracted_values = dpath.values(parent_record, expand_path) 98 except KeyError: 99 extracted_values = [] 100 101 for extracted in extracted_values: 102 if not isinstance(extracted, list): 103 continue 104 items = extracted 105 for item in items: 106 if isinstance(item, dict): 107 expanded_record = dict(item) 108 self._apply_parent_context(parent_record, expanded_record) 109 yield expanded_record 110 expanded_any = True 111 else: 112 if self.remain_original_record: 113 yield { 114 "value": item, 115 "original_record": copy.deepcopy(parent_record), 116 } 117 else: 118 yield item 119 expanded_any = True 120 121 if not expanded_any and self.on_no_records == OnNoRecords.emit_parent: 122 yield parent_record 123 124 def _apply_parent_context( 125 self, parent_record: Mapping[str, Any], child_record: MutableMapping[str, Any] 126 ) -> None: 127 """Apply parent context to a child record.""" 128 if self.remain_original_record: 129 child_record["original_record"] = copy.deepcopy(parent_record)
Expands records by extracting items from a nested array field.
When configured, this component extracts items from a specified nested array path
within each record and emits each item as a separate record. Set remain_original_record: true
to embed the full parent record under original_record in each expanded item when you need
downstream transformations to access parent context.
The expand_records_from_field path supports wildcards (*) for matching multiple arrays. When wildcards are used, items from all matched arrays are extracted and emitted.
Examples of instantiating this component:
record_expander:
type: RecordExpander
expand_records_from_field:
- "lines"
- "data"
remain_original_record: true
record_expander:
type: RecordExpander
expand_records_from_field:
- "sections"
- "*"
- "items"
on_no_records: emit_parent
Attributes:
- expand_records_from_field: Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*).
- remain_original_record: If True, each expanded record will include the original parent record in an "original_record" field. Defaults to False.
- on_no_records: Behavior when expansion produces no records. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged.
- config: The user-provided configuration as specified by the source's spec.
RecordExpander( expand_records_from_field: Sequence[str], config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], remain_original_record: bool = False, on_no_records: OnNoRecords = <OnNoRecords.skip: 'skip'>)
def
expand_record( self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]:
81 def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]: 82 """Expand a record by extracting items from a nested array field.""" 83 if not isinstance(record, Mapping): 84 # If the input isn't a mapping, expansion can't proceed; yield as-is. 85 yield record 86 return 87 88 if not self._expand_path: 89 yield record 90 return 91 92 parent_record = record 93 expand_path = [path.eval(self.config) for path in self._expand_path] 94 expanded_any = False 95 96 try: 97 extracted_values = dpath.values(parent_record, expand_path) 98 except KeyError: 99 extracted_values = [] 100 101 for extracted in extracted_values: 102 if not isinstance(extracted, list): 103 continue 104 items = extracted 105 for item in items: 106 if isinstance(item, dict): 107 expanded_record = dict(item) 108 self._apply_parent_context(parent_record, expanded_record) 109 yield expanded_record 110 expanded_any = True 111 else: 112 if self.remain_original_record: 113 yield { 114 "value": item, 115 "original_record": copy.deepcopy(parent_record), 116 } 117 else: 118 yield item 119 expanded_any = True 120 121 if not expanded_any and self.on_no_records == OnNoRecords.emit_parent: 122 yield parent_record
Expand a record by extracting items from a nested array field.