airbyte.records
PyAirbyte Records module.
Understanding record handling in PyAirbyte
PyAirbyte models record handling after Airbyte's "Destination V2" ("Dv2") record handling. This includes the below implementation details.
Field Name Normalization
- PyAirbyte normalizes top-level record keys to lowercase, replacing spaces and hyphens with underscores.
- PyAirbyte does not normalize nested keys on sub-properties.
For example, the following record:
{
"My-Field": "value",
"Nested": {
"MySubField": "value"
}
}
Would be normalized to:
{
"my_field": "value",
"nested": {
"MySubField": "value"
}
}
Table Name Normalization
Similar to column handling, PyAirbyte normalizes table names to the lowercase version of the stream name and may remove or normalize special characters.
Airbyte-Managed Metadata Columns
PyAirbyte adds the following columns to every record:
ab_raw_id: A unique identifier for the record.ab_extracted_at: The time the record was extracted.ab_meta: A dictionary of extra metadata about the record.
The names of these columns are included in the airbyte.constants module for programmatic
reference.
Schema Evolution
PyAirbyte supports a very basic form of schema evolution:
- Columns are always auto-added to cache tables whenever newly arriving properties are detected as not present in the cache table.
- Column types will not be modified or expanded to fit changed types in the source catalog.
- If column types change, we recommend user to manually alter the column types.
- At any time, users can run a full sync with a
WriteStrategyof 'replace'. This will create a fresh table from scratch and then swap the old and new tables after table sync is complete.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte Records module. 3 4## Understanding record handling in PyAirbyte 5 6PyAirbyte models record handling after Airbyte's "Destination V2" ("Dv2") record handling. This 7includes the below implementation details. 8 9### Field Name Normalization 10 111. PyAirbyte normalizes top-level record keys to lowercase, replacing spaces and hyphens with 12 underscores. 132. PyAirbyte does not normalize nested keys on sub-properties. 14 15For example, the following record: 16 17```json 18{ 19 20 "My-Field": "value", 21 "Nested": { 22 "MySubField": "value" 23 } 24} 25``` 26 27Would be normalized to: 28 29```json 30{ 31 "my_field": "value", 32 "nested": { 33 "MySubField": "value" 34 } 35} 36``` 37 38### Table Name Normalization 39 40Similar to column handling, PyAirbyte normalizes table names to the lowercase version of the stream 41name and may remove or normalize special characters. 42 43### Airbyte-Managed Metadata Columns 44 45PyAirbyte adds the following columns to every record: 46 47- `ab_raw_id`: A unique identifier for the record. 48- `ab_extracted_at`: The time the record was extracted. 49- `ab_meta`: A dictionary of extra metadata about the record. 50 51The names of these columns are included in the `airbyte.constants` module for programmatic 52reference. 53 54## Schema Evolution 55 56PyAirbyte supports a very basic form of schema evolution: 57 581. Columns are always auto-added to cache tables whenever newly arriving properties are detected 59 as not present in the cache table. 602. Column types will not be modified or expanded to fit changed types in the source catalog. 61 - If column types change, we recommend user to manually alter the column types. 623. At any time, users can run a full sync with a `WriteStrategy` of 'replace'. This will create a 63 fresh table from scratch and then swap the old and new tables after table sync is complete. 64 65--- 66 67""" 68 69from __future__ import annotations 70 71from datetime import datetime, timezone 72from typing import TYPE_CHECKING, Any 73 74from uuid_extensions import uuid7str 75 76from airbyte._util.name_normalizers import LowerCaseNormalizer, NameNormalizerBase 77from airbyte.constants import ( 78 AB_EXTRACTED_AT_COLUMN, 79 AB_INTERNAL_COLUMNS, 80 AB_META_COLUMN, 81 AB_RAW_ID_COLUMN, 82) 83 84 85if TYPE_CHECKING: 86 from collections.abc import Iterator 87 88 from airbyte_protocol.models import ( 89 AirbyteRecordMessage, 90 ) 91 92 93class StreamRecordHandler: 94 """A class to handle processing of StreamRecords. 95 96 This is a long-lived object that can be used to process multiple StreamRecords, which 97 saves on memory and processing time by reusing the same object for all records of the same type. 98 """ 99 100 def __init__( 101 self, 102 *, 103 json_schema: dict, 104 normalizer: type[NameNormalizerBase] = LowerCaseNormalizer, 105 normalize_keys: bool = False, 106 prune_extra_fields: bool, 107 ) -> None: 108 """Initialize the dictionary with the given data. 109 110 Args: 111 json_schema: The JSON Schema definition for this stream. 112 normalizer: The normalizer to use when normalizing keys. If not provided, the 113 LowerCaseNormalizer will be used. 114 normalize_keys: If `True`, the keys will be normalized using the given normalizer. 115 prune_extra_fields: If `True`, unexpected fields will be removed. 116 """ 117 self._expected_keys: list[str] = list(json_schema.get("properties", {}).keys()) 118 self._normalizer: type[NameNormalizerBase] = normalizer 119 self._normalize_keys: bool = normalize_keys 120 self.prune_extra_fields: bool = prune_extra_fields 121 122 self.index_keys: list[str] = [ 123 self._normalizer.normalize(key) if self._normalize_keys else key 124 for key in self._expected_keys 125 ] 126 self.normalized_keys: list[str] = [ 127 self._normalizer.normalize(key) for key in self._expected_keys 128 ] 129 self.quick_lookup: dict[str, str] 130 131 for internal_col in AB_INTERNAL_COLUMNS: 132 if internal_col not in self._expected_keys: 133 self._expected_keys.append(internal_col) 134 135 # Store a lookup from normalized keys to pretty cased (originally cased) keys. 136 self._pretty_case_lookup: dict[str, str] = { 137 self._normalizer.normalize(pretty_case.lower()): pretty_case 138 for pretty_case in self._expected_keys 139 } 140 # Store a map from all key versions (normalized and pretty-cased) to their normalized 141 # version. 142 self.quick_lookup = { 143 key: self._normalizer.normalize(key) 144 if self._normalize_keys 145 else self.to_display_case(key) 146 for key in set(self._expected_keys) | set(self._pretty_case_lookup.values()) 147 } 148 149 def to_display_case(self, key: str) -> str: 150 """Return the original case of the key. 151 152 If the key is not found in the pretty case lookup, return the key provided. 153 """ 154 return self._pretty_case_lookup.get(self._normalizer.normalize(key), key) 155 156 def to_index_case(self, key: str) -> str: 157 """Return the internal case of the key. 158 159 If `normalize_keys` is True, returns the normalized key. 160 Otherwise, return the original case ("pretty case") of the key. 161 """ 162 try: 163 return self.quick_lookup[key] 164 except KeyError: 165 result = ( 166 self._normalizer.normalize(key) 167 if self._normalize_keys 168 else self.to_display_case(key) 169 ) 170 self.quick_lookup[key] = result 171 return result 172 173 174class StreamRecord(dict[str, Any]): 175 """The StreamRecord class is a case-aware, case-insensitive dictionary implementation. 176 177 It has these behaviors: 178 - When a key is retrieved, deleted, or checked for existence, it is always checked in a 179 case-insensitive manner. 180 - The original case is stored in a separate dictionary, so that the original case can be 181 retrieved when needed. 182 - Because it is subclassed from `dict`, the `StreamRecord` class can be passed as a normal 183 Python dictionary. 184 - In addition to the properties of the stream's records, the dictionary also stores the Airbyte 185 metadata columns: `_airbyte_raw_id`, `_airbyte_extracted_at`, and `_airbyte_meta`. 186 187 This behavior mirrors how a case-aware, case-insensitive SQL database would handle column 188 references. 189 190 There are two ways this class can store keys internally: 191 - If normalize_keys is True, the keys are normalized using the given normalizer. 192 - If normalize_keys is False, the original case of the keys is stored. 193 194 In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the 195 dictionary will be initialized with the given keys. If a key is not found in the input data, it 196 will be initialized with a value of None. When provided, the 'expected_keys' input will also 197 determine the original case of the keys. 198 """ 199 200 def __init__( 201 self, 202 from_dict: dict, 203 *, 204 stream_record_handler: StreamRecordHandler, 205 with_internal_columns: bool = True, 206 extracted_at: datetime | None = None, 207 ) -> None: 208 """Initialize the dictionary with the given data. 209 210 Args: 211 from_dict: The dictionary to initialize the StreamRecord with. 212 stream_record_handler: The StreamRecordHandler to use for processing the record. 213 with_internal_columns: If `True`, the internal columns will be added to the record. 214 extracted_at: The time the record was extracted. If not provided, the current time will 215 be used. 216 """ 217 self._stream_handler: StreamRecordHandler = stream_record_handler 218 219 # Start by initializing all values to None 220 self.update(dict.fromkeys(stream_record_handler.index_keys)) 221 222 # Update the dictionary with the given data 223 if self._stream_handler.prune_extra_fields: 224 self.update( 225 { 226 self._stream_handler.to_index_case(k): v 227 for k, v in from_dict.items() 228 if self._stream_handler.to_index_case(k) in self._stream_handler.index_keys 229 } 230 ) 231 else: 232 self.update({self._stream_handler.to_index_case(k): v for k, v in from_dict.items()}) 233 234 if with_internal_columns: 235 self.update( 236 { 237 AB_RAW_ID_COLUMN: uuid7str(), 238 AB_EXTRACTED_AT_COLUMN: extracted_at or datetime.now(timezone.utc), 239 AB_META_COLUMN: {}, 240 } 241 ) 242 243 @classmethod 244 def from_record_message( 245 cls, 246 record_message: AirbyteRecordMessage, 247 *, 248 stream_record_handler: StreamRecordHandler, 249 ) -> StreamRecord: 250 """Return a StreamRecord from a RecordMessage.""" 251 data_dict: dict[str, Any] = record_message.data.copy() 252 return cls( 253 from_dict=data_dict, 254 stream_record_handler=stream_record_handler, 255 with_internal_columns=True, 256 extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=timezone.utc), 257 ) 258 259 def __getitem__(self, key: str) -> Any: # noqa: ANN401 260 """Return the item with the given key.""" 261 try: 262 return super().__getitem__(key) 263 except KeyError: 264 return super().__getitem__(self._stream_handler.to_index_case(key)) 265 266 def __setitem__(self, key: str, value: Any) -> None: # noqa: ANN401 267 """Set the item with the given key to the given value.""" 268 index_case_key = self._stream_handler.to_index_case(key) 269 if ( 270 self._stream_handler.prune_extra_fields 271 and index_case_key not in self._stream_handler.index_keys 272 ): 273 return 274 275 super().__setitem__(index_case_key, value) 276 277 def __delitem__(self, key: str) -> None: 278 """Delete the item with the given key.""" 279 try: 280 super().__delitem__(key) 281 except KeyError: 282 index_case_key = self._stream_handler.to_index_case(key) 283 if super().__contains__(index_case_key): 284 super().__delitem__(index_case_key) 285 return 286 else: 287 # No failure. Key was deleted. 288 return 289 290 raise KeyError(key) 291 292 def __contains__(self, key: object) -> bool: 293 """Return whether the dictionary contains the given key.""" 294 assert isinstance(key, str), "Key must be a string." 295 return super().__contains__(key) or super().__contains__( 296 self._stream_handler.to_index_case(key) 297 ) 298 299 def __iter__(self) -> Iterator[str]: 300 """Return an iterator over the keys of the dictionary.""" 301 return iter(super().__iter__()) 302 303 def __len__(self) -> int: 304 """Return the number of items in the dictionary.""" 305 return super().__len__() 306 307 def __eq__(self, other: object) -> bool: 308 """Return whether the StreamRecord is equal to the given dict or StreamRecord object.""" 309 if isinstance(other, StreamRecord): 310 return dict(self) == dict(other) 311 312 if isinstance(other, dict): 313 return {k.lower(): v for k, v in self.items()} == { 314 k.lower(): v for k, v in other.items() 315 } 316 return False 317 318 def __hash__(self) -> int: # type: ignore [override] # Doesn't match superclass (dict) 319 """Return the hash of the dictionary with keys sorted.""" 320 items = [(k, v) for k, v in self.items() if not isinstance(v, dict)] 321 return hash(tuple(sorted(items)))
94class StreamRecordHandler: 95 """A class to handle processing of StreamRecords. 96 97 This is a long-lived object that can be used to process multiple StreamRecords, which 98 saves on memory and processing time by reusing the same object for all records of the same type. 99 """ 100 101 def __init__( 102 self, 103 *, 104 json_schema: dict, 105 normalizer: type[NameNormalizerBase] = LowerCaseNormalizer, 106 normalize_keys: bool = False, 107 prune_extra_fields: bool, 108 ) -> None: 109 """Initialize the dictionary with the given data. 110 111 Args: 112 json_schema: The JSON Schema definition for this stream. 113 normalizer: The normalizer to use when normalizing keys. If not provided, the 114 LowerCaseNormalizer will be used. 115 normalize_keys: If `True`, the keys will be normalized using the given normalizer. 116 prune_extra_fields: If `True`, unexpected fields will be removed. 117 """ 118 self._expected_keys: list[str] = list(json_schema.get("properties", {}).keys()) 119 self._normalizer: type[NameNormalizerBase] = normalizer 120 self._normalize_keys: bool = normalize_keys 121 self.prune_extra_fields: bool = prune_extra_fields 122 123 self.index_keys: list[str] = [ 124 self._normalizer.normalize(key) if self._normalize_keys else key 125 for key in self._expected_keys 126 ] 127 self.normalized_keys: list[str] = [ 128 self._normalizer.normalize(key) for key in self._expected_keys 129 ] 130 self.quick_lookup: dict[str, str] 131 132 for internal_col in AB_INTERNAL_COLUMNS: 133 if internal_col not in self._expected_keys: 134 self._expected_keys.append(internal_col) 135 136 # Store a lookup from normalized keys to pretty cased (originally cased) keys. 137 self._pretty_case_lookup: dict[str, str] = { 138 self._normalizer.normalize(pretty_case.lower()): pretty_case 139 for pretty_case in self._expected_keys 140 } 141 # Store a map from all key versions (normalized and pretty-cased) to their normalized 142 # version. 143 self.quick_lookup = { 144 key: self._normalizer.normalize(key) 145 if self._normalize_keys 146 else self.to_display_case(key) 147 for key in set(self._expected_keys) | set(self._pretty_case_lookup.values()) 148 } 149 150 def to_display_case(self, key: str) -> str: 151 """Return the original case of the key. 152 153 If the key is not found in the pretty case lookup, return the key provided. 154 """ 155 return self._pretty_case_lookup.get(self._normalizer.normalize(key), key) 156 157 def to_index_case(self, key: str) -> str: 158 """Return the internal case of the key. 159 160 If `normalize_keys` is True, returns the normalized key. 161 Otherwise, return the original case ("pretty case") of the key. 162 """ 163 try: 164 return self.quick_lookup[key] 165 except KeyError: 166 result = ( 167 self._normalizer.normalize(key) 168 if self._normalize_keys 169 else self.to_display_case(key) 170 ) 171 self.quick_lookup[key] = result 172 return result
A class to handle processing of StreamRecords.
This is a long-lived object that can be used to process multiple StreamRecords, which saves on memory and processing time by reusing the same object for all records of the same type.
101 def __init__( 102 self, 103 *, 104 json_schema: dict, 105 normalizer: type[NameNormalizerBase] = LowerCaseNormalizer, 106 normalize_keys: bool = False, 107 prune_extra_fields: bool, 108 ) -> None: 109 """Initialize the dictionary with the given data. 110 111 Args: 112 json_schema: The JSON Schema definition for this stream. 113 normalizer: The normalizer to use when normalizing keys. If not provided, the 114 LowerCaseNormalizer will be used. 115 normalize_keys: If `True`, the keys will be normalized using the given normalizer. 116 prune_extra_fields: If `True`, unexpected fields will be removed. 117 """ 118 self._expected_keys: list[str] = list(json_schema.get("properties", {}).keys()) 119 self._normalizer: type[NameNormalizerBase] = normalizer 120 self._normalize_keys: bool = normalize_keys 121 self.prune_extra_fields: bool = prune_extra_fields 122 123 self.index_keys: list[str] = [ 124 self._normalizer.normalize(key) if self._normalize_keys else key 125 for key in self._expected_keys 126 ] 127 self.normalized_keys: list[str] = [ 128 self._normalizer.normalize(key) for key in self._expected_keys 129 ] 130 self.quick_lookup: dict[str, str] 131 132 for internal_col in AB_INTERNAL_COLUMNS: 133 if internal_col not in self._expected_keys: 134 self._expected_keys.append(internal_col) 135 136 # Store a lookup from normalized keys to pretty cased (originally cased) keys. 137 self._pretty_case_lookup: dict[str, str] = { 138 self._normalizer.normalize(pretty_case.lower()): pretty_case 139 for pretty_case in self._expected_keys 140 } 141 # Store a map from all key versions (normalized and pretty-cased) to their normalized 142 # version. 143 self.quick_lookup = { 144 key: self._normalizer.normalize(key) 145 if self._normalize_keys 146 else self.to_display_case(key) 147 for key in set(self._expected_keys) | set(self._pretty_case_lookup.values()) 148 }
Initialize the dictionary with the given data.
Arguments:
- json_schema: The JSON Schema definition for this stream.
- normalizer: The normalizer to use when normalizing keys. If not provided, the LowerCaseNormalizer will be used.
- normalize_keys: If
True, the keys will be normalized using the given normalizer. - prune_extra_fields: If
True, unexpected fields will be removed.
150 def to_display_case(self, key: str) -> str: 151 """Return the original case of the key. 152 153 If the key is not found in the pretty case lookup, return the key provided. 154 """ 155 return self._pretty_case_lookup.get(self._normalizer.normalize(key), key)
Return the original case of the key.
If the key is not found in the pretty case lookup, return the key provided.
157 def to_index_case(self, key: str) -> str: 158 """Return the internal case of the key. 159 160 If `normalize_keys` is True, returns the normalized key. 161 Otherwise, return the original case ("pretty case") of the key. 162 """ 163 try: 164 return self.quick_lookup[key] 165 except KeyError: 166 result = ( 167 self._normalizer.normalize(key) 168 if self._normalize_keys 169 else self.to_display_case(key) 170 ) 171 self.quick_lookup[key] = result 172 return result
Return the internal case of the key.
If normalize_keys is True, returns the normalized key.
Otherwise, return the original case ("pretty case") of the key.
175class StreamRecord(dict[str, Any]): 176 """The StreamRecord class is a case-aware, case-insensitive dictionary implementation. 177 178 It has these behaviors: 179 - When a key is retrieved, deleted, or checked for existence, it is always checked in a 180 case-insensitive manner. 181 - The original case is stored in a separate dictionary, so that the original case can be 182 retrieved when needed. 183 - Because it is subclassed from `dict`, the `StreamRecord` class can be passed as a normal 184 Python dictionary. 185 - In addition to the properties of the stream's records, the dictionary also stores the Airbyte 186 metadata columns: `_airbyte_raw_id`, `_airbyte_extracted_at`, and `_airbyte_meta`. 187 188 This behavior mirrors how a case-aware, case-insensitive SQL database would handle column 189 references. 190 191 There are two ways this class can store keys internally: 192 - If normalize_keys is True, the keys are normalized using the given normalizer. 193 - If normalize_keys is False, the original case of the keys is stored. 194 195 In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the 196 dictionary will be initialized with the given keys. If a key is not found in the input data, it 197 will be initialized with a value of None. When provided, the 'expected_keys' input will also 198 determine the original case of the keys. 199 """ 200 201 def __init__( 202 self, 203 from_dict: dict, 204 *, 205 stream_record_handler: StreamRecordHandler, 206 with_internal_columns: bool = True, 207 extracted_at: datetime | None = None, 208 ) -> None: 209 """Initialize the dictionary with the given data. 210 211 Args: 212 from_dict: The dictionary to initialize the StreamRecord with. 213 stream_record_handler: The StreamRecordHandler to use for processing the record. 214 with_internal_columns: If `True`, the internal columns will be added to the record. 215 extracted_at: The time the record was extracted. If not provided, the current time will 216 be used. 217 """ 218 self._stream_handler: StreamRecordHandler = stream_record_handler 219 220 # Start by initializing all values to None 221 self.update(dict.fromkeys(stream_record_handler.index_keys)) 222 223 # Update the dictionary with the given data 224 if self._stream_handler.prune_extra_fields: 225 self.update( 226 { 227 self._stream_handler.to_index_case(k): v 228 for k, v in from_dict.items() 229 if self._stream_handler.to_index_case(k) in self._stream_handler.index_keys 230 } 231 ) 232 else: 233 self.update({self._stream_handler.to_index_case(k): v for k, v in from_dict.items()}) 234 235 if with_internal_columns: 236 self.update( 237 { 238 AB_RAW_ID_COLUMN: uuid7str(), 239 AB_EXTRACTED_AT_COLUMN: extracted_at or datetime.now(timezone.utc), 240 AB_META_COLUMN: {}, 241 } 242 ) 243 244 @classmethod 245 def from_record_message( 246 cls, 247 record_message: AirbyteRecordMessage, 248 *, 249 stream_record_handler: StreamRecordHandler, 250 ) -> StreamRecord: 251 """Return a StreamRecord from a RecordMessage.""" 252 data_dict: dict[str, Any] = record_message.data.copy() 253 return cls( 254 from_dict=data_dict, 255 stream_record_handler=stream_record_handler, 256 with_internal_columns=True, 257 extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=timezone.utc), 258 ) 259 260 def __getitem__(self, key: str) -> Any: # noqa: ANN401 261 """Return the item with the given key.""" 262 try: 263 return super().__getitem__(key) 264 except KeyError: 265 return super().__getitem__(self._stream_handler.to_index_case(key)) 266 267 def __setitem__(self, key: str, value: Any) -> None: # noqa: ANN401 268 """Set the item with the given key to the given value.""" 269 index_case_key = self._stream_handler.to_index_case(key) 270 if ( 271 self._stream_handler.prune_extra_fields 272 and index_case_key not in self._stream_handler.index_keys 273 ): 274 return 275 276 super().__setitem__(index_case_key, value) 277 278 def __delitem__(self, key: str) -> None: 279 """Delete the item with the given key.""" 280 try: 281 super().__delitem__(key) 282 except KeyError: 283 index_case_key = self._stream_handler.to_index_case(key) 284 if super().__contains__(index_case_key): 285 super().__delitem__(index_case_key) 286 return 287 else: 288 # No failure. Key was deleted. 289 return 290 291 raise KeyError(key) 292 293 def __contains__(self, key: object) -> bool: 294 """Return whether the dictionary contains the given key.""" 295 assert isinstance(key, str), "Key must be a string." 296 return super().__contains__(key) or super().__contains__( 297 self._stream_handler.to_index_case(key) 298 ) 299 300 def __iter__(self) -> Iterator[str]: 301 """Return an iterator over the keys of the dictionary.""" 302 return iter(super().__iter__()) 303 304 def __len__(self) -> int: 305 """Return the number of items in the dictionary.""" 306 return super().__len__() 307 308 def __eq__(self, other: object) -> bool: 309 """Return whether the StreamRecord is equal to the given dict or StreamRecord object.""" 310 if isinstance(other, StreamRecord): 311 return dict(self) == dict(other) 312 313 if isinstance(other, dict): 314 return {k.lower(): v for k, v in self.items()} == { 315 k.lower(): v for k, v in other.items() 316 } 317 return False 318 319 def __hash__(self) -> int: # type: ignore [override] # Doesn't match superclass (dict) 320 """Return the hash of the dictionary with keys sorted.""" 321 items = [(k, v) for k, v in self.items() if not isinstance(v, dict)] 322 return hash(tuple(sorted(items)))
The StreamRecord class is a case-aware, case-insensitive dictionary implementation.
It has these behaviors:
- When a key is retrieved, deleted, or checked for existence, it is always checked in a case-insensitive manner.
- The original case is stored in a separate dictionary, so that the original case can be retrieved when needed.
- Because it is subclassed from
dict, theStreamRecordclass can be passed as a normal Python dictionary. - In addition to the properties of the stream's records, the dictionary also stores the Airbyte
metadata columns:
_airbyte_raw_id,_airbyte_extracted_at, and_airbyte_meta.
This behavior mirrors how a case-aware, case-insensitive SQL database would handle column references.
There are two ways this class can store keys internally:
- If normalize_keys is True, the keys are normalized using the given normalizer.
- If normalize_keys is False, the original case of the keys is stored.
In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the dictionary will be initialized with the given keys. If a key is not found in the input data, it will be initialized with a value of None. When provided, the 'expected_keys' input will also determine the original case of the keys.
244 @classmethod 245 def from_record_message( 246 cls, 247 record_message: AirbyteRecordMessage, 248 *, 249 stream_record_handler: StreamRecordHandler, 250 ) -> StreamRecord: 251 """Return a StreamRecord from a RecordMessage.""" 252 data_dict: dict[str, Any] = record_message.data.copy() 253 return cls( 254 from_dict=data_dict, 255 stream_record_handler=stream_record_handler, 256 with_internal_columns=True, 257 extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=timezone.utc), 258 )
Return a StreamRecord from a RecordMessage.