airbyte.records
PyAirbyte Records module.
Understanding record handling in PyAirbyte
PyAirbyte models record handling after Airbyte's "Destination V2" ("Dv2") record handling. This includes the below implementation details.
Field Name Normalization
- PyAirbyte normalizes top-level record keys to lowercase, replacing spaces and hyphens with underscores.
- PyAirbyte does not normalize nested keys on sub-properties.
For example, the following record:
{
"My-Field": "value",
"Nested": {
"MySubField": "value"
}
}
Would be normalized to:
{
"my_field": "value",
"nested": {
"MySubField": "value"
}
}
Table Name Normalization
Similar to column handling, PyAirbyte normalizes table names to the lowercase version of the stream name and may remove or normalize special characters.
Airbyte-Managed Metadata Columns
PyAirbyte adds the following columns to every record:
ab_raw_id
: A unique identifier for the record.ab_extracted_at
: The time the record was extracted.ab_meta
: A dictionary of extra metadata about the record.
The names of these columns are included in the airbyte.constants
module for programmatic
reference.
Schema Evolution
PyAirbyte supports a very basic form of schema evolution:
- Columns are always auto-added to cache tables whenever newly arriving properties are detected as not present in the cache table.
- Column types will not be modified or expanded to fit changed types in the source catalog.
- If column types change, we recommend user to manually alter the column types.
- At any time, users can run a full sync with a
WriteStrategy
of 'replace'. This will create a fresh table from scratch and then swap the old and new tables after table sync is complete.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte Records module. 3 4## Understanding record handling in PyAirbyte 5 6PyAirbyte models record handling after Airbyte's "Destination V2" ("Dv2") record handling. This 7includes the below implementation details. 8 9### Field Name Normalization 10 111. PyAirbyte normalizes top-level record keys to lowercase, replacing spaces and hyphens with 12 underscores. 132. PyAirbyte does not normalize nested keys on sub-properties. 14 15For example, the following record: 16 17```json 18{ 19 20 "My-Field": "value", 21 "Nested": { 22 "MySubField": "value" 23 } 24} 25``` 26 27Would be normalized to: 28 29```json 30{ 31 "my_field": "value", 32 "nested": { 33 "MySubField": "value" 34 } 35} 36``` 37 38### Table Name Normalization 39 40Similar to column handling, PyAirbyte normalizes table names to the lowercase version of the stream 41name and may remove or normalize special characters. 42 43### Airbyte-Managed Metadata Columns 44 45PyAirbyte adds the following columns to every record: 46 47- `ab_raw_id`: A unique identifier for the record. 48- `ab_extracted_at`: The time the record was extracted. 49- `ab_meta`: A dictionary of extra metadata about the record. 50 51The names of these columns are included in the `airbyte.constants` module for programmatic 52reference. 53 54## Schema Evolution 55 56PyAirbyte supports a very basic form of schema evolution: 57 581. Columns are always auto-added to cache tables whenever newly arriving properties are detected 59 as not present in the cache table. 602. Column types will not be modified or expanded to fit changed types in the source catalog. 61 - If column types change, we recommend user to manually alter the column types. 623. At any time, users can run a full sync with a `WriteStrategy` of 'replace'. This will create a 63 fresh table from scratch and then swap the old and new tables after table sync is complete. 64 65--- 66 67""" 68 69from __future__ import annotations 70 71from datetime import datetime 72from typing import TYPE_CHECKING, Any 73 74import pytz 75from uuid_extensions import uuid7str 76 77from airbyte._util.name_normalizers import LowerCaseNormalizer, NameNormalizerBase 78from airbyte.constants import ( 79 AB_EXTRACTED_AT_COLUMN, 80 AB_INTERNAL_COLUMNS, 81 AB_META_COLUMN, 82 AB_RAW_ID_COLUMN, 83) 84 85 86if TYPE_CHECKING: 87 from collections.abc import Iterator 88 89 from airbyte_protocol.models import ( 90 AirbyteRecordMessage, 91 ) 92 93 94class StreamRecordHandler: 95 """A class to handle processing of StreamRecords. 96 97 This is a long-lived object that can be used to process multiple StreamRecords, which 98 saves on memory and processing time by reusing the same object for all records of the same type. 99 """ 100 101 def __init__( 102 self, 103 *, 104 json_schema: dict, 105 normalizer: type[NameNormalizerBase] = LowerCaseNormalizer, 106 normalize_keys: bool = False, 107 prune_extra_fields: bool, 108 ) -> None: 109 """Initialize the dictionary with the given data. 110 111 Args: 112 json_schema: The JSON Schema definition for this stream. 113 normalizer: The normalizer to use when normalizing keys. If not provided, the 114 LowerCaseNormalizer will be used. 115 normalize_keys: If `True`, the keys will be normalized using the given normalizer. 116 prune_extra_fields: If `True`, unexpected fields will be removed. 117 """ 118 self._expected_keys: list[str] = list(json_schema.get("properties", {}).keys()) 119 self._normalizer: type[NameNormalizerBase] = normalizer 120 self._normalize_keys: bool = normalize_keys 121 self.prune_extra_fields: bool = prune_extra_fields 122 123 self.index_keys: list[str] = [ 124 self._normalizer.normalize(key) if self._normalize_keys else key 125 for key in self._expected_keys 126 ] 127 self.normalized_keys: list[str] = [ 128 self._normalizer.normalize(key) for key in self._expected_keys 129 ] 130 self.quick_lookup: dict[str, str] 131 132 for internal_col in AB_INTERNAL_COLUMNS: 133 if internal_col not in self._expected_keys: 134 self._expected_keys.append(internal_col) 135 136 # Store a lookup from normalized keys to pretty cased (originally cased) keys. 137 self._pretty_case_lookup: dict[str, str] = { 138 self._normalizer.normalize(pretty_case.lower()): pretty_case 139 for pretty_case in self._expected_keys 140 } 141 # Store a map from all key versions (normalized and pretty-cased) to their normalized 142 # version. 143 self.quick_lookup = { 144 key: self._normalizer.normalize(key) 145 if self._normalize_keys 146 else self.to_display_case(key) 147 for key in set(self._expected_keys) | set(self._pretty_case_lookup.values()) 148 } 149 150 def to_display_case(self, key: str) -> str: 151 """Return the original case of the key.""" 152 return self._pretty_case_lookup[self._normalizer.normalize(key)] 153 154 def to_index_case(self, key: str) -> str: 155 """Return the internal case of the key. 156 157 If `normalize_keys` is True, returns the normalized key. 158 Otherwise, return the original case ("pretty case") of the key. 159 """ 160 try: 161 return self.quick_lookup[key] 162 except KeyError: 163 result = ( 164 self._normalizer.normalize(key) 165 if self._normalize_keys 166 else self.to_display_case(key) 167 ) 168 self.quick_lookup[key] = result 169 return result 170 171 172class StreamRecord(dict[str, Any]): 173 """The StreamRecord class is a case-aware, case-insensitive dictionary implementation. 174 175 It has these behaviors: 176 - When a key is retrieved, deleted, or checked for existence, it is always checked in a 177 case-insensitive manner. 178 - The original case is stored in a separate dictionary, so that the original case can be 179 retrieved when needed. 180 - Because it is subclassed from `dict`, the `StreamRecord` class can be passed as a normal 181 Python dictionary. 182 - In addition to the properties of the stream's records, the dictionary also stores the Airbyte 183 metadata columns: `_airbyte_raw_id`, `_airbyte_extracted_at`, and `_airbyte_meta`. 184 185 This behavior mirrors how a case-aware, case-insensitive SQL database would handle column 186 references. 187 188 There are two ways this class can store keys internally: 189 - If normalize_keys is True, the keys are normalized using the given normalizer. 190 - If normalize_keys is False, the original case of the keys is stored. 191 192 In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the 193 dictionary will be initialized with the given keys. If a key is not found in the input data, it 194 will be initialized with a value of None. When provided, the 'expected_keys' input will also 195 determine the original case of the keys. 196 """ 197 198 def __init__( 199 self, 200 from_dict: dict, 201 *, 202 stream_record_handler: StreamRecordHandler, 203 with_internal_columns: bool = True, 204 extracted_at: datetime | None = None, 205 ) -> None: 206 """Initialize the dictionary with the given data. 207 208 Args: 209 from_dict: The dictionary to initialize the StreamRecord with. 210 stream_record_handler: The StreamRecordHandler to use for processing the record. 211 with_internal_columns: If `True`, the internal columns will be added to the record. 212 extracted_at: The time the record was extracted. If not provided, the current time will 213 be used. 214 """ 215 self._stream_handler: StreamRecordHandler = stream_record_handler 216 217 # Start by initializing all values to None 218 self.update(dict.fromkeys(stream_record_handler.index_keys)) 219 220 # Update the dictionary with the given data 221 if self._stream_handler.prune_extra_fields: 222 self.update( 223 { 224 self._stream_handler.to_index_case(k): v 225 for k, v in from_dict.items() 226 if self._stream_handler.to_index_case(k) in self._stream_handler.index_keys 227 } 228 ) 229 else: 230 self.update({self._stream_handler.to_index_case(k): v for k, v in from_dict.items()}) 231 232 if with_internal_columns: 233 self.update( 234 { 235 AB_RAW_ID_COLUMN: uuid7str(), 236 AB_EXTRACTED_AT_COLUMN: extracted_at or datetime.now(pytz.utc), 237 AB_META_COLUMN: {}, 238 } 239 ) 240 241 @classmethod 242 def from_record_message( 243 cls, 244 record_message: AirbyteRecordMessage, 245 *, 246 stream_record_handler: StreamRecordHandler, 247 ) -> StreamRecord: 248 """Return a StreamRecord from a RecordMessage.""" 249 data_dict: dict[str, Any] = record_message.data.copy() 250 return cls( 251 from_dict=data_dict, 252 stream_record_handler=stream_record_handler, 253 with_internal_columns=True, 254 extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=pytz.utc), 255 ) 256 257 def __getitem__(self, key: str) -> Any: # noqa: ANN401 258 """Return the item with the given key.""" 259 try: 260 return super().__getitem__(key) 261 except KeyError: 262 return super().__getitem__(self._stream_handler.to_index_case(key)) 263 264 def __setitem__(self, key: str, value: Any) -> None: # noqa: ANN401 265 """Set the item with the given key to the given value.""" 266 index_case_key = self._stream_handler.to_index_case(key) 267 if ( 268 self._stream_handler.prune_extra_fields 269 and index_case_key not in self._stream_handler.index_keys 270 ): 271 return 272 273 super().__setitem__(index_case_key, value) 274 275 def __delitem__(self, key: str) -> None: 276 """Delete the item with the given key.""" 277 try: 278 super().__delitem__(key) 279 except KeyError: 280 index_case_key = self._stream_handler.to_index_case(key) 281 if super().__contains__(index_case_key): 282 super().__delitem__(index_case_key) 283 return 284 else: 285 # No failure. Key was deleted. 286 return 287 288 raise KeyError(key) 289 290 def __contains__(self, key: object) -> bool: 291 """Return whether the dictionary contains the given key.""" 292 assert isinstance(key, str), "Key must be a string." 293 return super().__contains__(key) or super().__contains__( 294 self._stream_handler.to_index_case(key) 295 ) 296 297 def __iter__(self) -> Iterator[str]: 298 """Return an iterator over the keys of the dictionary.""" 299 return iter(super().__iter__()) 300 301 def __len__(self) -> int: 302 """Return the number of items in the dictionary.""" 303 return super().__len__() 304 305 def __eq__(self, other: object) -> bool: 306 """Return whether the StreamRecord is equal to the given dict or StreamRecord object.""" 307 if isinstance(other, StreamRecord): 308 return dict(self) == dict(other) 309 310 if isinstance(other, dict): 311 return {k.lower(): v for k, v in self.items()} == { 312 k.lower(): v for k, v in other.items() 313 } 314 return False 315 316 def __hash__(self) -> int: # type: ignore [override] # Doesn't match superclass (dict) 317 """Return the hash of the dictionary with keys sorted.""" 318 items = [(k, v) for k, v in self.items() if not isinstance(v, dict)] 319 return hash(tuple(sorted(items)))
95class StreamRecordHandler: 96 """A class to handle processing of StreamRecords. 97 98 This is a long-lived object that can be used to process multiple StreamRecords, which 99 saves on memory and processing time by reusing the same object for all records of the same type. 100 """ 101 102 def __init__( 103 self, 104 *, 105 json_schema: dict, 106 normalizer: type[NameNormalizerBase] = LowerCaseNormalizer, 107 normalize_keys: bool = False, 108 prune_extra_fields: bool, 109 ) -> None: 110 """Initialize the dictionary with the given data. 111 112 Args: 113 json_schema: The JSON Schema definition for this stream. 114 normalizer: The normalizer to use when normalizing keys. If not provided, the 115 LowerCaseNormalizer will be used. 116 normalize_keys: If `True`, the keys will be normalized using the given normalizer. 117 prune_extra_fields: If `True`, unexpected fields will be removed. 118 """ 119 self._expected_keys: list[str] = list(json_schema.get("properties", {}).keys()) 120 self._normalizer: type[NameNormalizerBase] = normalizer 121 self._normalize_keys: bool = normalize_keys 122 self.prune_extra_fields: bool = prune_extra_fields 123 124 self.index_keys: list[str] = [ 125 self._normalizer.normalize(key) if self._normalize_keys else key 126 for key in self._expected_keys 127 ] 128 self.normalized_keys: list[str] = [ 129 self._normalizer.normalize(key) for key in self._expected_keys 130 ] 131 self.quick_lookup: dict[str, str] 132 133 for internal_col in AB_INTERNAL_COLUMNS: 134 if internal_col not in self._expected_keys: 135 self._expected_keys.append(internal_col) 136 137 # Store a lookup from normalized keys to pretty cased (originally cased) keys. 138 self._pretty_case_lookup: dict[str, str] = { 139 self._normalizer.normalize(pretty_case.lower()): pretty_case 140 for pretty_case in self._expected_keys 141 } 142 # Store a map from all key versions (normalized and pretty-cased) to their normalized 143 # version. 144 self.quick_lookup = { 145 key: self._normalizer.normalize(key) 146 if self._normalize_keys 147 else self.to_display_case(key) 148 for key in set(self._expected_keys) | set(self._pretty_case_lookup.values()) 149 } 150 151 def to_display_case(self, key: str) -> str: 152 """Return the original case of the key.""" 153 return self._pretty_case_lookup[self._normalizer.normalize(key)] 154 155 def to_index_case(self, key: str) -> str: 156 """Return the internal case of the key. 157 158 If `normalize_keys` is True, returns the normalized key. 159 Otherwise, return the original case ("pretty case") of the key. 160 """ 161 try: 162 return self.quick_lookup[key] 163 except KeyError: 164 result = ( 165 self._normalizer.normalize(key) 166 if self._normalize_keys 167 else self.to_display_case(key) 168 ) 169 self.quick_lookup[key] = result 170 return result
A class to handle processing of StreamRecords.
This is a long-lived object that can be used to process multiple StreamRecords, which saves on memory and processing time by reusing the same object for all records of the same type.
102 def __init__( 103 self, 104 *, 105 json_schema: dict, 106 normalizer: type[NameNormalizerBase] = LowerCaseNormalizer, 107 normalize_keys: bool = False, 108 prune_extra_fields: bool, 109 ) -> None: 110 """Initialize the dictionary with the given data. 111 112 Args: 113 json_schema: The JSON Schema definition for this stream. 114 normalizer: The normalizer to use when normalizing keys. If not provided, the 115 LowerCaseNormalizer will be used. 116 normalize_keys: If `True`, the keys will be normalized using the given normalizer. 117 prune_extra_fields: If `True`, unexpected fields will be removed. 118 """ 119 self._expected_keys: list[str] = list(json_schema.get("properties", {}).keys()) 120 self._normalizer: type[NameNormalizerBase] = normalizer 121 self._normalize_keys: bool = normalize_keys 122 self.prune_extra_fields: bool = prune_extra_fields 123 124 self.index_keys: list[str] = [ 125 self._normalizer.normalize(key) if self._normalize_keys else key 126 for key in self._expected_keys 127 ] 128 self.normalized_keys: list[str] = [ 129 self._normalizer.normalize(key) for key in self._expected_keys 130 ] 131 self.quick_lookup: dict[str, str] 132 133 for internal_col in AB_INTERNAL_COLUMNS: 134 if internal_col not in self._expected_keys: 135 self._expected_keys.append(internal_col) 136 137 # Store a lookup from normalized keys to pretty cased (originally cased) keys. 138 self._pretty_case_lookup: dict[str, str] = { 139 self._normalizer.normalize(pretty_case.lower()): pretty_case 140 for pretty_case in self._expected_keys 141 } 142 # Store a map from all key versions (normalized and pretty-cased) to their normalized 143 # version. 144 self.quick_lookup = { 145 key: self._normalizer.normalize(key) 146 if self._normalize_keys 147 else self.to_display_case(key) 148 for key in set(self._expected_keys) | set(self._pretty_case_lookup.values()) 149 }
Initialize the dictionary with the given data.
Arguments:
- json_schema: The JSON Schema definition for this stream.
- normalizer: The normalizer to use when normalizing keys. If not provided, the LowerCaseNormalizer will be used.
- normalize_keys: If
True
, the keys will be normalized using the given normalizer. - prune_extra_fields: If
True
, unexpected fields will be removed.
151 def to_display_case(self, key: str) -> str: 152 """Return the original case of the key.""" 153 return self._pretty_case_lookup[self._normalizer.normalize(key)]
Return the original case of the key.
155 def to_index_case(self, key: str) -> str: 156 """Return the internal case of the key. 157 158 If `normalize_keys` is True, returns the normalized key. 159 Otherwise, return the original case ("pretty case") of the key. 160 """ 161 try: 162 return self.quick_lookup[key] 163 except KeyError: 164 result = ( 165 self._normalizer.normalize(key) 166 if self._normalize_keys 167 else self.to_display_case(key) 168 ) 169 self.quick_lookup[key] = result 170 return result
Return the internal case of the key.
If normalize_keys
is True, returns the normalized key.
Otherwise, return the original case ("pretty case") of the key.
173class StreamRecord(dict[str, Any]): 174 """The StreamRecord class is a case-aware, case-insensitive dictionary implementation. 175 176 It has these behaviors: 177 - When a key is retrieved, deleted, or checked for existence, it is always checked in a 178 case-insensitive manner. 179 - The original case is stored in a separate dictionary, so that the original case can be 180 retrieved when needed. 181 - Because it is subclassed from `dict`, the `StreamRecord` class can be passed as a normal 182 Python dictionary. 183 - In addition to the properties of the stream's records, the dictionary also stores the Airbyte 184 metadata columns: `_airbyte_raw_id`, `_airbyte_extracted_at`, and `_airbyte_meta`. 185 186 This behavior mirrors how a case-aware, case-insensitive SQL database would handle column 187 references. 188 189 There are two ways this class can store keys internally: 190 - If normalize_keys is True, the keys are normalized using the given normalizer. 191 - If normalize_keys is False, the original case of the keys is stored. 192 193 In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the 194 dictionary will be initialized with the given keys. If a key is not found in the input data, it 195 will be initialized with a value of None. When provided, the 'expected_keys' input will also 196 determine the original case of the keys. 197 """ 198 199 def __init__( 200 self, 201 from_dict: dict, 202 *, 203 stream_record_handler: StreamRecordHandler, 204 with_internal_columns: bool = True, 205 extracted_at: datetime | None = None, 206 ) -> None: 207 """Initialize the dictionary with the given data. 208 209 Args: 210 from_dict: The dictionary to initialize the StreamRecord with. 211 stream_record_handler: The StreamRecordHandler to use for processing the record. 212 with_internal_columns: If `True`, the internal columns will be added to the record. 213 extracted_at: The time the record was extracted. If not provided, the current time will 214 be used. 215 """ 216 self._stream_handler: StreamRecordHandler = stream_record_handler 217 218 # Start by initializing all values to None 219 self.update(dict.fromkeys(stream_record_handler.index_keys)) 220 221 # Update the dictionary with the given data 222 if self._stream_handler.prune_extra_fields: 223 self.update( 224 { 225 self._stream_handler.to_index_case(k): v 226 for k, v in from_dict.items() 227 if self._stream_handler.to_index_case(k) in self._stream_handler.index_keys 228 } 229 ) 230 else: 231 self.update({self._stream_handler.to_index_case(k): v for k, v in from_dict.items()}) 232 233 if with_internal_columns: 234 self.update( 235 { 236 AB_RAW_ID_COLUMN: uuid7str(), 237 AB_EXTRACTED_AT_COLUMN: extracted_at or datetime.now(pytz.utc), 238 AB_META_COLUMN: {}, 239 } 240 ) 241 242 @classmethod 243 def from_record_message( 244 cls, 245 record_message: AirbyteRecordMessage, 246 *, 247 stream_record_handler: StreamRecordHandler, 248 ) -> StreamRecord: 249 """Return a StreamRecord from a RecordMessage.""" 250 data_dict: dict[str, Any] = record_message.data.copy() 251 return cls( 252 from_dict=data_dict, 253 stream_record_handler=stream_record_handler, 254 with_internal_columns=True, 255 extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=pytz.utc), 256 ) 257 258 def __getitem__(self, key: str) -> Any: # noqa: ANN401 259 """Return the item with the given key.""" 260 try: 261 return super().__getitem__(key) 262 except KeyError: 263 return super().__getitem__(self._stream_handler.to_index_case(key)) 264 265 def __setitem__(self, key: str, value: Any) -> None: # noqa: ANN401 266 """Set the item with the given key to the given value.""" 267 index_case_key = self._stream_handler.to_index_case(key) 268 if ( 269 self._stream_handler.prune_extra_fields 270 and index_case_key not in self._stream_handler.index_keys 271 ): 272 return 273 274 super().__setitem__(index_case_key, value) 275 276 def __delitem__(self, key: str) -> None: 277 """Delete the item with the given key.""" 278 try: 279 super().__delitem__(key) 280 except KeyError: 281 index_case_key = self._stream_handler.to_index_case(key) 282 if super().__contains__(index_case_key): 283 super().__delitem__(index_case_key) 284 return 285 else: 286 # No failure. Key was deleted. 287 return 288 289 raise KeyError(key) 290 291 def __contains__(self, key: object) -> bool: 292 """Return whether the dictionary contains the given key.""" 293 assert isinstance(key, str), "Key must be a string." 294 return super().__contains__(key) or super().__contains__( 295 self._stream_handler.to_index_case(key) 296 ) 297 298 def __iter__(self) -> Iterator[str]: 299 """Return an iterator over the keys of the dictionary.""" 300 return iter(super().__iter__()) 301 302 def __len__(self) -> int: 303 """Return the number of items in the dictionary.""" 304 return super().__len__() 305 306 def __eq__(self, other: object) -> bool: 307 """Return whether the StreamRecord is equal to the given dict or StreamRecord object.""" 308 if isinstance(other, StreamRecord): 309 return dict(self) == dict(other) 310 311 if isinstance(other, dict): 312 return {k.lower(): v for k, v in self.items()} == { 313 k.lower(): v for k, v in other.items() 314 } 315 return False 316 317 def __hash__(self) -> int: # type: ignore [override] # Doesn't match superclass (dict) 318 """Return the hash of the dictionary with keys sorted.""" 319 items = [(k, v) for k, v in self.items() if not isinstance(v, dict)] 320 return hash(tuple(sorted(items)))
The StreamRecord class is a case-aware, case-insensitive dictionary implementation.
It has these behaviors:
- When a key is retrieved, deleted, or checked for existence, it is always checked in a case-insensitive manner.
- The original case is stored in a separate dictionary, so that the original case can be retrieved when needed.
- Because it is subclassed from
dict
, theStreamRecord
class can be passed as a normal Python dictionary. - In addition to the properties of the stream's records, the dictionary also stores the Airbyte
metadata columns:
_airbyte_raw_id
,_airbyte_extracted_at
, and_airbyte_meta
.
This behavior mirrors how a case-aware, case-insensitive SQL database would handle column references.
There are two ways this class can store keys internally:
- If normalize_keys is True, the keys are normalized using the given normalizer.
- If normalize_keys is False, the original case of the keys is stored.
In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the dictionary will be initialized with the given keys. If a key is not found in the input data, it will be initialized with a value of None. When provided, the 'expected_keys' input will also determine the original case of the keys.
242 @classmethod 243 def from_record_message( 244 cls, 245 record_message: AirbyteRecordMessage, 246 *, 247 stream_record_handler: StreamRecordHandler, 248 ) -> StreamRecord: 249 """Return a StreamRecord from a RecordMessage.""" 250 data_dict: dict[str, Any] = record_message.data.copy() 251 return cls( 252 from_dict=data_dict, 253 stream_record_handler=stream_record_handler, 254 with_internal_columns=True, 255 extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=pytz.utc), 256 )
Return a StreamRecord from a RecordMessage.
Inherited Members
- builtins.dict
- get
- setdefault
- pop
- popitem
- keys
- items
- values
- update
- fromkeys
- clear
- copy