airbyte.records
PyAirbyte Records module.
Understanding record handling in PyAirbyte
PyAirbyte models record handling after Airbyte's "Destination V2" ("Dv2") record handling. This includes the below implementation details.
Field Name Normalization
- PyAirbyte normalizes top-level record keys to lowercase, replacing spaces and hyphens with underscores.
- PyAirbyte does not normalize nested keys on sub-properties.
For example, the following record:
{
"My-Field": "value",
"Nested": {
"MySubField": "value"
}
}
Would be normalized to:
{
"my_field": "value",
"nested": {
"MySubField": "value"
}
}
Table Name Normalization
Similar to column handling, PyAirbyte normalizes table names to the lowercase version of the stream name and may remove or normalize special characters.
Airbyte-Managed Metadata Columns
PyAirbyte adds the following columns to every record:
ab_raw_id
: A unique identifier for the record.ab_extracted_at
: The time the record was extracted.ab_meta
: A dictionary of extra metadata about the record.
The names of these columns are included in the airbyte.constants
module for programmatic
reference.
Schema Evolution
PyAirbyte supports a very basic form of schema evolution:
- Columns are always auto-added to cache tables whenever newly arriving properties are detected as not present in the cache table.
- Column types will not be modified or expanded to fit changed types in the source catalog.
- If column types change, we recommend user to manually alter the column types.
- At any time, users can run a full sync with a
WriteStrategy
of 'replace'. This will create a fresh table from scratch and then swap the old and new tables after table sync is complete.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte Records module. 3 4## Understanding record handling in PyAirbyte 5 6PyAirbyte models record handling after Airbyte's "Destination V2" ("Dv2") record handling. This 7includes the below implementation details. 8 9### Field Name Normalization 10 111. PyAirbyte normalizes top-level record keys to lowercase, replacing spaces and hyphens with 12 underscores. 132. PyAirbyte does not normalize nested keys on sub-properties. 14 15For example, the following record: 16 17```json 18{ 19 20 "My-Field": "value", 21 "Nested": { 22 "MySubField": "value" 23 } 24} 25``` 26 27Would be normalized to: 28 29```json 30{ 31 "my_field": "value", 32 "nested": { 33 "MySubField": "value" 34 } 35} 36``` 37 38### Table Name Normalization 39 40Similar to column handling, PyAirbyte normalizes table names to the lowercase version of the stream 41name and may remove or normalize special characters. 42 43### Airbyte-Managed Metadata Columns 44 45PyAirbyte adds the following columns to every record: 46 47- `ab_raw_id`: A unique identifier for the record. 48- `ab_extracted_at`: The time the record was extracted. 49- `ab_meta`: A dictionary of extra metadata about the record. 50 51The names of these columns are included in the `airbyte.constants` module for programmatic 52reference. 53 54## Schema Evolution 55 56PyAirbyte supports a very basic form of schema evolution: 57 581. Columns are always auto-added to cache tables whenever newly arriving properties are detected 59 as not present in the cache table. 602. Column types will not be modified or expanded to fit changed types in the source catalog. 61 - If column types change, we recommend user to manually alter the column types. 623. At any time, users can run a full sync with a `WriteStrategy` of 'replace'. This will create a 63 fresh table from scratch and then swap the old and new tables after table sync is complete. 64 65--- 66 67""" 68 69from __future__ import annotations 70 71from datetime import datetime 72from typing import TYPE_CHECKING, Any 73 74import pytz 75from uuid_extensions import uuid7str 76 77from airbyte._util.name_normalizers import LowerCaseNormalizer, NameNormalizerBase 78from airbyte.constants import ( 79 AB_EXTRACTED_AT_COLUMN, 80 AB_INTERNAL_COLUMNS, 81 AB_META_COLUMN, 82 AB_RAW_ID_COLUMN, 83) 84 85 86if TYPE_CHECKING: 87 from collections.abc import Iterator 88 89 from airbyte_protocol.models import ( 90 AirbyteRecordMessage, 91 ) 92 93 94class StreamRecordHandler: 95 """A class to handle processing of StreamRecords. 96 97 This is a long-lived object that can be used to process multiple StreamRecords, which 98 saves on memory and processing time by reusing the same object for all records of the same type. 99 """ 100 101 def __init__( 102 self, 103 *, 104 json_schema: dict, 105 normalizer: type[NameNormalizerBase] = LowerCaseNormalizer, 106 normalize_keys: bool = False, 107 prune_extra_fields: bool, 108 ) -> None: 109 """Initialize the dictionary with the given data. 110 111 Args: 112 json_schema: The JSON Schema definition for this stream. 113 normalizer: The normalizer to use when normalizing keys. If not provided, the 114 LowerCaseNormalizer will be used. 115 normalize_keys: If `True`, the keys will be normalized using the given normalizer. 116 prune_extra_fields: If `True`, unexpected fields will be removed. 117 """ 118 self._expected_keys: list[str] = list(json_schema.get("properties", {}).keys()) 119 self._normalizer: type[NameNormalizerBase] = normalizer 120 self._normalize_keys: bool = normalize_keys 121 self.prune_extra_fields: bool = prune_extra_fields 122 123 self.index_keys: list[str] = [ 124 self._normalizer.normalize(key) if self._normalize_keys else key 125 for key in self._expected_keys 126 ] 127 self.normalized_keys: list[str] = [ 128 self._normalizer.normalize(key) for key in self._expected_keys 129 ] 130 self.quick_lookup: dict[str, str] 131 132 for internal_col in AB_INTERNAL_COLUMNS: 133 if internal_col not in self._expected_keys: 134 self._expected_keys.append(internal_col) 135 136 # Store a lookup from normalized keys to pretty cased (originally cased) keys. 137 self._pretty_case_lookup: dict[str, str] = { 138 self._normalizer.normalize(pretty_case.lower()): pretty_case 139 for pretty_case in self._expected_keys 140 } 141 # Store a map from all key versions (normalized and pretty-cased) to their normalized 142 # version. 143 self.quick_lookup = { 144 key: self._normalizer.normalize(key) 145 if self._normalize_keys 146 else self.to_display_case(key) 147 for key in set(self._expected_keys) | set(self._pretty_case_lookup.values()) 148 } 149 150 def to_display_case(self, key: str) -> str: 151 """Return the original case of the key. 152 153 If the key is not found in the pretty case lookup, return the key provided. 154 """ 155 return self._pretty_case_lookup.get(self._normalizer.normalize(key), key) 156 157 def to_index_case(self, key: str) -> str: 158 """Return the internal case of the key. 159 160 If `normalize_keys` is True, returns the normalized key. 161 Otherwise, return the original case ("pretty case") of the key. 162 """ 163 try: 164 return self.quick_lookup[key] 165 except KeyError: 166 result = ( 167 self._normalizer.normalize(key) 168 if self._normalize_keys 169 else self.to_display_case(key) 170 ) 171 self.quick_lookup[key] = result 172 return result 173 174 175class StreamRecord(dict[str, Any]): 176 """The StreamRecord class is a case-aware, case-insensitive dictionary implementation. 177 178 It has these behaviors: 179 - When a key is retrieved, deleted, or checked for existence, it is always checked in a 180 case-insensitive manner. 181 - The original case is stored in a separate dictionary, so that the original case can be 182 retrieved when needed. 183 - Because it is subclassed from `dict`, the `StreamRecord` class can be passed as a normal 184 Python dictionary. 185 - In addition to the properties of the stream's records, the dictionary also stores the Airbyte 186 metadata columns: `_airbyte_raw_id`, `_airbyte_extracted_at`, and `_airbyte_meta`. 187 188 This behavior mirrors how a case-aware, case-insensitive SQL database would handle column 189 references. 190 191 There are two ways this class can store keys internally: 192 - If normalize_keys is True, the keys are normalized using the given normalizer. 193 - If normalize_keys is False, the original case of the keys is stored. 194 195 In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the 196 dictionary will be initialized with the given keys. If a key is not found in the input data, it 197 will be initialized with a value of None. When provided, the 'expected_keys' input will also 198 determine the original case of the keys. 199 """ 200 201 def __init__( 202 self, 203 from_dict: dict, 204 *, 205 stream_record_handler: StreamRecordHandler, 206 with_internal_columns: bool = True, 207 extracted_at: datetime | None = None, 208 ) -> None: 209 """Initialize the dictionary with the given data. 210 211 Args: 212 from_dict: The dictionary to initialize the StreamRecord with. 213 stream_record_handler: The StreamRecordHandler to use for processing the record. 214 with_internal_columns: If `True`, the internal columns will be added to the record. 215 extracted_at: The time the record was extracted. If not provided, the current time will 216 be used. 217 """ 218 self._stream_handler: StreamRecordHandler = stream_record_handler 219 220 # Start by initializing all values to None 221 self.update(dict.fromkeys(stream_record_handler.index_keys)) 222 223 # Update the dictionary with the given data 224 if self._stream_handler.prune_extra_fields: 225 self.update( 226 { 227 self._stream_handler.to_index_case(k): v 228 for k, v in from_dict.items() 229 if self._stream_handler.to_index_case(k) in self._stream_handler.index_keys 230 } 231 ) 232 else: 233 self.update({self._stream_handler.to_index_case(k): v for k, v in from_dict.items()}) 234 235 if with_internal_columns: 236 self.update( 237 { 238 AB_RAW_ID_COLUMN: uuid7str(), 239 AB_EXTRACTED_AT_COLUMN: extracted_at or datetime.now(pytz.utc), 240 AB_META_COLUMN: {}, 241 } 242 ) 243 244 @classmethod 245 def from_record_message( 246 cls, 247 record_message: AirbyteRecordMessage, 248 *, 249 stream_record_handler: StreamRecordHandler, 250 ) -> StreamRecord: 251 """Return a StreamRecord from a RecordMessage.""" 252 data_dict: dict[str, Any] = record_message.data.copy() 253 return cls( 254 from_dict=data_dict, 255 stream_record_handler=stream_record_handler, 256 with_internal_columns=True, 257 extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=pytz.utc), 258 ) 259 260 def __getitem__(self, key: str) -> Any: # noqa: ANN401 261 """Return the item with the given key.""" 262 try: 263 return super().__getitem__(key) 264 except KeyError: 265 return super().__getitem__(self._stream_handler.to_index_case(key)) 266 267 def __setitem__(self, key: str, value: Any) -> None: # noqa: ANN401 268 """Set the item with the given key to the given value.""" 269 index_case_key = self._stream_handler.to_index_case(key) 270 if ( 271 self._stream_handler.prune_extra_fields 272 and index_case_key not in self._stream_handler.index_keys 273 ): 274 return 275 276 super().__setitem__(index_case_key, value) 277 278 def __delitem__(self, key: str) -> None: 279 """Delete the item with the given key.""" 280 try: 281 super().__delitem__(key) 282 except KeyError: 283 index_case_key = self._stream_handler.to_index_case(key) 284 if super().__contains__(index_case_key): 285 super().__delitem__(index_case_key) 286 return 287 else: 288 # No failure. Key was deleted. 289 return 290 291 raise KeyError(key) 292 293 def __contains__(self, key: object) -> bool: 294 """Return whether the dictionary contains the given key.""" 295 assert isinstance(key, str), "Key must be a string." 296 return super().__contains__(key) or super().__contains__( 297 self._stream_handler.to_index_case(key) 298 ) 299 300 def __iter__(self) -> Iterator[str]: 301 """Return an iterator over the keys of the dictionary.""" 302 return iter(super().__iter__()) 303 304 def __len__(self) -> int: 305 """Return the number of items in the dictionary.""" 306 return super().__len__() 307 308 def __eq__(self, other: object) -> bool: 309 """Return whether the StreamRecord is equal to the given dict or StreamRecord object.""" 310 if isinstance(other, StreamRecord): 311 return dict(self) == dict(other) 312 313 if isinstance(other, dict): 314 return {k.lower(): v for k, v in self.items()} == { 315 k.lower(): v for k, v in other.items() 316 } 317 return False 318 319 def __hash__(self) -> int: # type: ignore [override] # Doesn't match superclass (dict) 320 """Return the hash of the dictionary with keys sorted.""" 321 items = [(k, v) for k, v in self.items() if not isinstance(v, dict)] 322 return hash(tuple(sorted(items)))
95class StreamRecordHandler: 96 """A class to handle processing of StreamRecords. 97 98 This is a long-lived object that can be used to process multiple StreamRecords, which 99 saves on memory and processing time by reusing the same object for all records of the same type. 100 """ 101 102 def __init__( 103 self, 104 *, 105 json_schema: dict, 106 normalizer: type[NameNormalizerBase] = LowerCaseNormalizer, 107 normalize_keys: bool = False, 108 prune_extra_fields: bool, 109 ) -> None: 110 """Initialize the dictionary with the given data. 111 112 Args: 113 json_schema: The JSON Schema definition for this stream. 114 normalizer: The normalizer to use when normalizing keys. If not provided, the 115 LowerCaseNormalizer will be used. 116 normalize_keys: If `True`, the keys will be normalized using the given normalizer. 117 prune_extra_fields: If `True`, unexpected fields will be removed. 118 """ 119 self._expected_keys: list[str] = list(json_schema.get("properties", {}).keys()) 120 self._normalizer: type[NameNormalizerBase] = normalizer 121 self._normalize_keys: bool = normalize_keys 122 self.prune_extra_fields: bool = prune_extra_fields 123 124 self.index_keys: list[str] = [ 125 self._normalizer.normalize(key) if self._normalize_keys else key 126 for key in self._expected_keys 127 ] 128 self.normalized_keys: list[str] = [ 129 self._normalizer.normalize(key) for key in self._expected_keys 130 ] 131 self.quick_lookup: dict[str, str] 132 133 for internal_col in AB_INTERNAL_COLUMNS: 134 if internal_col not in self._expected_keys: 135 self._expected_keys.append(internal_col) 136 137 # Store a lookup from normalized keys to pretty cased (originally cased) keys. 138 self._pretty_case_lookup: dict[str, str] = { 139 self._normalizer.normalize(pretty_case.lower()): pretty_case 140 for pretty_case in self._expected_keys 141 } 142 # Store a map from all key versions (normalized and pretty-cased) to their normalized 143 # version. 144 self.quick_lookup = { 145 key: self._normalizer.normalize(key) 146 if self._normalize_keys 147 else self.to_display_case(key) 148 for key in set(self._expected_keys) | set(self._pretty_case_lookup.values()) 149 } 150 151 def to_display_case(self, key: str) -> str: 152 """Return the original case of the key. 153 154 If the key is not found in the pretty case lookup, return the key provided. 155 """ 156 return self._pretty_case_lookup.get(self._normalizer.normalize(key), key) 157 158 def to_index_case(self, key: str) -> str: 159 """Return the internal case of the key. 160 161 If `normalize_keys` is True, returns the normalized key. 162 Otherwise, return the original case ("pretty case") of the key. 163 """ 164 try: 165 return self.quick_lookup[key] 166 except KeyError: 167 result = ( 168 self._normalizer.normalize(key) 169 if self._normalize_keys 170 else self.to_display_case(key) 171 ) 172 self.quick_lookup[key] = result 173 return result
A class to handle processing of StreamRecords.
This is a long-lived object that can be used to process multiple StreamRecords, which saves on memory and processing time by reusing the same object for all records of the same type.
102 def __init__( 103 self, 104 *, 105 json_schema: dict, 106 normalizer: type[NameNormalizerBase] = LowerCaseNormalizer, 107 normalize_keys: bool = False, 108 prune_extra_fields: bool, 109 ) -> None: 110 """Initialize the dictionary with the given data. 111 112 Args: 113 json_schema: The JSON Schema definition for this stream. 114 normalizer: The normalizer to use when normalizing keys. If not provided, the 115 LowerCaseNormalizer will be used. 116 normalize_keys: If `True`, the keys will be normalized using the given normalizer. 117 prune_extra_fields: If `True`, unexpected fields will be removed. 118 """ 119 self._expected_keys: list[str] = list(json_schema.get("properties", {}).keys()) 120 self._normalizer: type[NameNormalizerBase] = normalizer 121 self._normalize_keys: bool = normalize_keys 122 self.prune_extra_fields: bool = prune_extra_fields 123 124 self.index_keys: list[str] = [ 125 self._normalizer.normalize(key) if self._normalize_keys else key 126 for key in self._expected_keys 127 ] 128 self.normalized_keys: list[str] = [ 129 self._normalizer.normalize(key) for key in self._expected_keys 130 ] 131 self.quick_lookup: dict[str, str] 132 133 for internal_col in AB_INTERNAL_COLUMNS: 134 if internal_col not in self._expected_keys: 135 self._expected_keys.append(internal_col) 136 137 # Store a lookup from normalized keys to pretty cased (originally cased) keys. 138 self._pretty_case_lookup: dict[str, str] = { 139 self._normalizer.normalize(pretty_case.lower()): pretty_case 140 for pretty_case in self._expected_keys 141 } 142 # Store a map from all key versions (normalized and pretty-cased) to their normalized 143 # version. 144 self.quick_lookup = { 145 key: self._normalizer.normalize(key) 146 if self._normalize_keys 147 else self.to_display_case(key) 148 for key in set(self._expected_keys) | set(self._pretty_case_lookup.values()) 149 }
Initialize the dictionary with the given data.
Arguments:
- json_schema: The JSON Schema definition for this stream.
- normalizer: The normalizer to use when normalizing keys. If not provided, the LowerCaseNormalizer will be used.
- normalize_keys: If
True
, the keys will be normalized using the given normalizer. - prune_extra_fields: If
True
, unexpected fields will be removed.
151 def to_display_case(self, key: str) -> str: 152 """Return the original case of the key. 153 154 If the key is not found in the pretty case lookup, return the key provided. 155 """ 156 return self._pretty_case_lookup.get(self._normalizer.normalize(key), key)
Return the original case of the key.
If the key is not found in the pretty case lookup, return the key provided.
158 def to_index_case(self, key: str) -> str: 159 """Return the internal case of the key. 160 161 If `normalize_keys` is True, returns the normalized key. 162 Otherwise, return the original case ("pretty case") of the key. 163 """ 164 try: 165 return self.quick_lookup[key] 166 except KeyError: 167 result = ( 168 self._normalizer.normalize(key) 169 if self._normalize_keys 170 else self.to_display_case(key) 171 ) 172 self.quick_lookup[key] = result 173 return result
Return the internal case of the key.
If normalize_keys
is True, returns the normalized key.
Otherwise, return the original case ("pretty case") of the key.
176class StreamRecord(dict[str, Any]): 177 """The StreamRecord class is a case-aware, case-insensitive dictionary implementation. 178 179 It has these behaviors: 180 - When a key is retrieved, deleted, or checked for existence, it is always checked in a 181 case-insensitive manner. 182 - The original case is stored in a separate dictionary, so that the original case can be 183 retrieved when needed. 184 - Because it is subclassed from `dict`, the `StreamRecord` class can be passed as a normal 185 Python dictionary. 186 - In addition to the properties of the stream's records, the dictionary also stores the Airbyte 187 metadata columns: `_airbyte_raw_id`, `_airbyte_extracted_at`, and `_airbyte_meta`. 188 189 This behavior mirrors how a case-aware, case-insensitive SQL database would handle column 190 references. 191 192 There are two ways this class can store keys internally: 193 - If normalize_keys is True, the keys are normalized using the given normalizer. 194 - If normalize_keys is False, the original case of the keys is stored. 195 196 In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the 197 dictionary will be initialized with the given keys. If a key is not found in the input data, it 198 will be initialized with a value of None. When provided, the 'expected_keys' input will also 199 determine the original case of the keys. 200 """ 201 202 def __init__( 203 self, 204 from_dict: dict, 205 *, 206 stream_record_handler: StreamRecordHandler, 207 with_internal_columns: bool = True, 208 extracted_at: datetime | None = None, 209 ) -> None: 210 """Initialize the dictionary with the given data. 211 212 Args: 213 from_dict: The dictionary to initialize the StreamRecord with. 214 stream_record_handler: The StreamRecordHandler to use for processing the record. 215 with_internal_columns: If `True`, the internal columns will be added to the record. 216 extracted_at: The time the record was extracted. If not provided, the current time will 217 be used. 218 """ 219 self._stream_handler: StreamRecordHandler = stream_record_handler 220 221 # Start by initializing all values to None 222 self.update(dict.fromkeys(stream_record_handler.index_keys)) 223 224 # Update the dictionary with the given data 225 if self._stream_handler.prune_extra_fields: 226 self.update( 227 { 228 self._stream_handler.to_index_case(k): v 229 for k, v in from_dict.items() 230 if self._stream_handler.to_index_case(k) in self._stream_handler.index_keys 231 } 232 ) 233 else: 234 self.update({self._stream_handler.to_index_case(k): v for k, v in from_dict.items()}) 235 236 if with_internal_columns: 237 self.update( 238 { 239 AB_RAW_ID_COLUMN: uuid7str(), 240 AB_EXTRACTED_AT_COLUMN: extracted_at or datetime.now(pytz.utc), 241 AB_META_COLUMN: {}, 242 } 243 ) 244 245 @classmethod 246 def from_record_message( 247 cls, 248 record_message: AirbyteRecordMessage, 249 *, 250 stream_record_handler: StreamRecordHandler, 251 ) -> StreamRecord: 252 """Return a StreamRecord from a RecordMessage.""" 253 data_dict: dict[str, Any] = record_message.data.copy() 254 return cls( 255 from_dict=data_dict, 256 stream_record_handler=stream_record_handler, 257 with_internal_columns=True, 258 extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=pytz.utc), 259 ) 260 261 def __getitem__(self, key: str) -> Any: # noqa: ANN401 262 """Return the item with the given key.""" 263 try: 264 return super().__getitem__(key) 265 except KeyError: 266 return super().__getitem__(self._stream_handler.to_index_case(key)) 267 268 def __setitem__(self, key: str, value: Any) -> None: # noqa: ANN401 269 """Set the item with the given key to the given value.""" 270 index_case_key = self._stream_handler.to_index_case(key) 271 if ( 272 self._stream_handler.prune_extra_fields 273 and index_case_key not in self._stream_handler.index_keys 274 ): 275 return 276 277 super().__setitem__(index_case_key, value) 278 279 def __delitem__(self, key: str) -> None: 280 """Delete the item with the given key.""" 281 try: 282 super().__delitem__(key) 283 except KeyError: 284 index_case_key = self._stream_handler.to_index_case(key) 285 if super().__contains__(index_case_key): 286 super().__delitem__(index_case_key) 287 return 288 else: 289 # No failure. Key was deleted. 290 return 291 292 raise KeyError(key) 293 294 def __contains__(self, key: object) -> bool: 295 """Return whether the dictionary contains the given key.""" 296 assert isinstance(key, str), "Key must be a string." 297 return super().__contains__(key) or super().__contains__( 298 self._stream_handler.to_index_case(key) 299 ) 300 301 def __iter__(self) -> Iterator[str]: 302 """Return an iterator over the keys of the dictionary.""" 303 return iter(super().__iter__()) 304 305 def __len__(self) -> int: 306 """Return the number of items in the dictionary.""" 307 return super().__len__() 308 309 def __eq__(self, other: object) -> bool: 310 """Return whether the StreamRecord is equal to the given dict or StreamRecord object.""" 311 if isinstance(other, StreamRecord): 312 return dict(self) == dict(other) 313 314 if isinstance(other, dict): 315 return {k.lower(): v for k, v in self.items()} == { 316 k.lower(): v for k, v in other.items() 317 } 318 return False 319 320 def __hash__(self) -> int: # type: ignore [override] # Doesn't match superclass (dict) 321 """Return the hash of the dictionary with keys sorted.""" 322 items = [(k, v) for k, v in self.items() if not isinstance(v, dict)] 323 return hash(tuple(sorted(items)))
The StreamRecord class is a case-aware, case-insensitive dictionary implementation.
It has these behaviors:
- When a key is retrieved, deleted, or checked for existence, it is always checked in a case-insensitive manner.
- The original case is stored in a separate dictionary, so that the original case can be retrieved when needed.
- Because it is subclassed from
dict
, theStreamRecord
class can be passed as a normal Python dictionary. - In addition to the properties of the stream's records, the dictionary also stores the Airbyte
metadata columns:
_airbyte_raw_id
,_airbyte_extracted_at
, and_airbyte_meta
.
This behavior mirrors how a case-aware, case-insensitive SQL database would handle column references.
There are two ways this class can store keys internally:
- If normalize_keys is True, the keys are normalized using the given normalizer.
- If normalize_keys is False, the original case of the keys is stored.
In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the dictionary will be initialized with the given keys. If a key is not found in the input data, it will be initialized with a value of None. When provided, the 'expected_keys' input will also determine the original case of the keys.
245 @classmethod 246 def from_record_message( 247 cls, 248 record_message: AirbyteRecordMessage, 249 *, 250 stream_record_handler: StreamRecordHandler, 251 ) -> StreamRecord: 252 """Return a StreamRecord from a RecordMessage.""" 253 data_dict: dict[str, Any] = record_message.data.copy() 254 return cls( 255 from_dict=data_dict, 256 stream_record_handler=stream_record_handler, 257 with_internal_columns=True, 258 extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=pytz.utc), 259 )
Return a StreamRecord from a RecordMessage.
Inherited Members
- builtins.dict
- get
- setdefault
- pop
- popitem
- keys
- items
- values
- update
- fromkeys
- clear
- copy