airbyte.records
PyAirbyte Records module.
Understanding record handling in PyAirbyte
PyAirbyte models record handling after Airbyte's "Destination V2" ("Dv2") record handling. This includes the below implementation details.
Field Name Normalization
- PyAirbyte normalizes top-level record keys to lowercase, replacing spaces and hyphens with underscores.
- PyAirbyte does not normalize nested keys on sub-properties.
For example, the following record:
{
"My-Field": "value",
"Nested": {
"MySubField": "value"
}
}
Would be normalized to:
{
"my_field": "value",
"nested": {
"MySubField": "value"
}
}
Table Name Normalization
Similar to column handling, PyAirbyte normalizes table names to the lowercase version of the stream name and may remove or normalize special characters.
Airbyte-Managed Metadata Columns
PyAirbyte adds the following columns to every record:
ab_raw_id
: A unique identifier for the record.ab_extracted_at
: The time the record was extracted.ab_meta
: A dictionary of extra metadata about the record.
The names of these columns are included in the airbyte.constants
module for programmatic
reference.
Schema Evolution
PyAirbyte supports a very basic form of schema evolution:
- Columns are always auto-added to cache tables whenever newly arriving properties are detected as not present in the cache table.
- Column types will not be modified or expanded to fit changed types in the source catalog.
- If column types change, we recommend user to manually alter the column types.
- At any time, users can run a full sync with a
WriteStrategy
of 'replace'. This will create a fresh table from scratch and then swap the old and new tables after table sync is complete.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte Records module. 3 4## Understanding record handling in PyAirbyte 5 6PyAirbyte models record handling after Airbyte's "Destination V2" ("Dv2") record handling. This 7includes the below implementation details. 8 9### Field Name Normalization 10 111. PyAirbyte normalizes top-level record keys to lowercase, replacing spaces and hyphens with 12 underscores. 132. PyAirbyte does not normalize nested keys on sub-properties. 14 15For example, the following record: 16 17```json 18{ 19 20 "My-Field": "value", 21 "Nested": { 22 "MySubField": "value" 23 } 24} 25``` 26 27Would be normalized to: 28 29```json 30{ 31 "my_field": "value", 32 "nested": { 33 "MySubField": "value" 34 } 35} 36``` 37 38### Table Name Normalization 39 40Similar to column handling, PyAirbyte normalizes table names to the lowercase version of the stream 41name and may remove or normalize special characters. 42 43### Airbyte-Managed Metadata Columns 44 45PyAirbyte adds the following columns to every record: 46 47- `ab_raw_id`: A unique identifier for the record. 48- `ab_extracted_at`: The time the record was extracted. 49- `ab_meta`: A dictionary of extra metadata about the record. 50 51The names of these columns are included in the `airbyte.constants` module for programmatic 52reference. 53 54## Schema Evolution 55 56PyAirbyte supports a very basic form of schema evolution: 57 581. Columns are always auto-added to cache tables whenever newly arriving properties are detected 59 as not present in the cache table. 602. Column types will not be modified or expanded to fit changed types in the source catalog. 61 - If column types change, we recommend user to manually alter the column types. 623. At any time, users can run a full sync with a `WriteStrategy` of 'replace'. This will create a 63 fresh table from scratch and then swap the old and new tables after table sync is complete. 64 65--- 66 67""" 68 69from __future__ import annotations 70 71from datetime import datetime 72from typing import TYPE_CHECKING, Any 73 74import pytz 75import ulid 76 77from airbyte._util.name_normalizers import LowerCaseNormalizer, NameNormalizerBase 78from airbyte.constants import ( 79 AB_EXTRACTED_AT_COLUMN, 80 AB_INTERNAL_COLUMNS, 81 AB_META_COLUMN, 82 AB_RAW_ID_COLUMN, 83) 84 85 86if TYPE_CHECKING: 87 from airbyte_protocol.models import ( 88 AirbyteRecordMessage, 89 ) 90 91 92class StreamRecord(dict[str, Any]): 93 """The StreamRecord class is a case-aware, case-insensitive dictionary implementation. 94 95 It has these behaviors: 96 - When a key is retrieved, deleted, or checked for existence, it is always checked in a 97 case-insensitive manner. 98 - The original case is stored in a separate dictionary, so that the original case can be 99 retrieved when needed. 100 - Because it is subclassed from `dict`, the `StreamRecord` class can be passed as a normal 101 Python dictionary. 102 - In addition to the properties of the stream's records, the dictionary also stores the Airbyte 103 metadata columns: `_airbyte_raw_id`, `_airbyte_extracted_at`, and `_airbyte_meta`. 104 105 This behavior mirrors how a case-aware, case-insensitive SQL database would handle column 106 references. 107 108 There are two ways this class can store keys internally: 109 - If normalize_keys is True, the keys are normalized using the given normalizer. 110 - If normalize_keys is False, the original case of the keys is stored. 111 112 In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the 113 dictionary will be initialized with the given keys. If a key is not found in the input data, it 114 will be initialized with a value of None. When provided, the 'expected_keys' input will also 115 determine the original case of the keys. 116 """ 117 118 def _display_case(self, key: str) -> str: 119 """Return the original case of the key.""" 120 return self._pretty_case_keys[self._normalizer.normalize(key)] 121 122 def _index_case(self, key: str) -> str: 123 """Return the internal case of the key. 124 125 If normalize_keys is True, return the normalized key. 126 Otherwise, return the original case of the key. 127 """ 128 if self._normalize_keys: 129 return self._normalizer.normalize(key) 130 131 return self._display_case(key) 132 133 @classmethod 134 def from_record_message( 135 cls, 136 record_message: AirbyteRecordMessage, 137 *, 138 prune_extra_fields: bool, 139 normalize_keys: bool = True, 140 normalizer: type[NameNormalizerBase] | None = None, 141 expected_keys: list[str] | None = None, 142 ) -> StreamRecord: 143 """Return a StreamRecord from a RecordMessage.""" 144 data_dict: dict[str, Any] = record_message.data.copy() 145 data_dict[AB_RAW_ID_COLUMN] = str(ulid.ULID()) 146 data_dict[AB_EXTRACTED_AT_COLUMN] = datetime.fromtimestamp( 147 record_message.emitted_at / 1000, tz=pytz.utc 148 ) 149 data_dict[AB_META_COLUMN] = {} 150 151 return cls( 152 from_dict=data_dict, 153 prune_extra_fields=prune_extra_fields, 154 normalize_keys=normalize_keys, 155 normalizer=normalizer, 156 expected_keys=expected_keys, 157 ) 158 159 def __init__( 160 self, 161 from_dict: dict, 162 *, 163 prune_extra_fields: bool, 164 normalize_keys: bool = True, 165 normalizer: type[NameNormalizerBase] | None = None, 166 expected_keys: list[str] | None = None, 167 ) -> None: 168 """Initialize the dictionary with the given data. 169 170 Args: 171 from_dict: The dictionary to initialize the StreamRecord with. 172 prune_extra_fields: If `True`, unexpected fields will be removed. 173 normalize_keys: If `True`, the keys will be normalized using the given normalizer. 174 normalizer: The normalizer to use when normalizing keys. If not provided, the 175 LowerCaseNormalizer will be used. 176 expected_keys: If provided and `prune_extra_fields` is True, then unexpected fields 177 will be removed. This option is ignored if `expected_keys` is not provided. 178 """ 179 # If no normalizer is provided, use LowerCaseNormalizer. 180 self._normalize_keys = normalize_keys 181 self._normalizer: type[NameNormalizerBase] = normalizer or LowerCaseNormalizer 182 183 # If no expected keys are provided, use all keys from the input dictionary. 184 if not expected_keys: 185 expected_keys = list(from_dict.keys()) 186 prune_extra_fields = False # No expected keys provided. 187 else: 188 expected_keys = list(expected_keys) 189 190 for internal_col in AB_INTERNAL_COLUMNS: 191 if internal_col not in expected_keys: 192 expected_keys.append(internal_col) 193 194 # Store a lookup from normalized keys to pretty cased (originally cased) keys. 195 self._pretty_case_keys: dict[str, str] = { 196 self._normalizer.normalize(pretty_case.lower()): pretty_case 197 for pretty_case in expected_keys 198 } 199 200 if normalize_keys: 201 index_keys = [self._normalizer.normalize(key) for key in expected_keys] 202 else: 203 index_keys = expected_keys 204 205 self.update(dict.fromkeys(index_keys)) # Start by initializing all values to None 206 for k, v in from_dict.items(): 207 index_cased_key = self._index_case(k) 208 if prune_extra_fields and index_cased_key not in index_keys: 209 # Dropping undeclared field 210 continue 211 212 self[index_cased_key] = v 213 214 def __getitem__(self, key: str) -> Any: # noqa: ANN401 215 if super().__contains__(key): 216 return super().__getitem__(key) 217 218 if super().__contains__(self._index_case(key)): 219 return super().__getitem__(self._index_case(key)) 220 221 raise KeyError(key) 222 223 def __setitem__(self, key: str, value: Any) -> None: # noqa: ANN401 224 if super().__contains__(key): 225 super().__setitem__(key, value) 226 return 227 228 if super().__contains__(self._index_case(key)): 229 super().__setitem__(self._index_case(key), value) 230 return 231 232 # Store the pretty cased (originally cased) key: 233 self._pretty_case_keys[self._normalizer.normalize(key)] = key 234 235 # Store the data with the normalized key: 236 super().__setitem__(self._index_case(key), value) 237 238 def __delitem__(self, key: str) -> None: 239 if super().__contains__(key): 240 super().__delitem__(key) 241 return 242 243 if super().__contains__(self._index_case(key)): 244 super().__delitem__(self._index_case(key)) 245 return 246 247 raise KeyError(key) 248 249 def __contains__(self, key: object) -> bool: 250 assert isinstance(key, str), "Key must be a string." 251 return super().__contains__(key) or super().__contains__(self._index_case(key)) 252 253 def __iter__(self) -> Any: # noqa: ANN401 254 return iter(super().__iter__()) 255 256 def __len__(self) -> int: 257 return super().__len__() 258 259 def __eq__(self, other: object) -> bool: 260 if isinstance(other, StreamRecord): 261 return dict(self) == dict(other) 262 263 if isinstance(other, dict): 264 return {k.lower(): v for k, v in self.items()} == { 265 k.lower(): v for k, v in other.items() 266 } 267 return False 268 269 def __hash__(self) -> int: # type: ignore [override] # Doesn't match superclass (dict) 270 """Return the hash of the dictionary with keys sorted.""" 271 items = [(k, v) for k, v in self.items() if not isinstance(v, dict)] 272 return hash(tuple(sorted(items)))
93class StreamRecord(dict[str, Any]): 94 """The StreamRecord class is a case-aware, case-insensitive dictionary implementation. 95 96 It has these behaviors: 97 - When a key is retrieved, deleted, or checked for existence, it is always checked in a 98 case-insensitive manner. 99 - The original case is stored in a separate dictionary, so that the original case can be 100 retrieved when needed. 101 - Because it is subclassed from `dict`, the `StreamRecord` class can be passed as a normal 102 Python dictionary. 103 - In addition to the properties of the stream's records, the dictionary also stores the Airbyte 104 metadata columns: `_airbyte_raw_id`, `_airbyte_extracted_at`, and `_airbyte_meta`. 105 106 This behavior mirrors how a case-aware, case-insensitive SQL database would handle column 107 references. 108 109 There are two ways this class can store keys internally: 110 - If normalize_keys is True, the keys are normalized using the given normalizer. 111 - If normalize_keys is False, the original case of the keys is stored. 112 113 In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the 114 dictionary will be initialized with the given keys. If a key is not found in the input data, it 115 will be initialized with a value of None. When provided, the 'expected_keys' input will also 116 determine the original case of the keys. 117 """ 118 119 def _display_case(self, key: str) -> str: 120 """Return the original case of the key.""" 121 return self._pretty_case_keys[self._normalizer.normalize(key)] 122 123 def _index_case(self, key: str) -> str: 124 """Return the internal case of the key. 125 126 If normalize_keys is True, return the normalized key. 127 Otherwise, return the original case of the key. 128 """ 129 if self._normalize_keys: 130 return self._normalizer.normalize(key) 131 132 return self._display_case(key) 133 134 @classmethod 135 def from_record_message( 136 cls, 137 record_message: AirbyteRecordMessage, 138 *, 139 prune_extra_fields: bool, 140 normalize_keys: bool = True, 141 normalizer: type[NameNormalizerBase] | None = None, 142 expected_keys: list[str] | None = None, 143 ) -> StreamRecord: 144 """Return a StreamRecord from a RecordMessage.""" 145 data_dict: dict[str, Any] = record_message.data.copy() 146 data_dict[AB_RAW_ID_COLUMN] = str(ulid.ULID()) 147 data_dict[AB_EXTRACTED_AT_COLUMN] = datetime.fromtimestamp( 148 record_message.emitted_at / 1000, tz=pytz.utc 149 ) 150 data_dict[AB_META_COLUMN] = {} 151 152 return cls( 153 from_dict=data_dict, 154 prune_extra_fields=prune_extra_fields, 155 normalize_keys=normalize_keys, 156 normalizer=normalizer, 157 expected_keys=expected_keys, 158 ) 159 160 def __init__( 161 self, 162 from_dict: dict, 163 *, 164 prune_extra_fields: bool, 165 normalize_keys: bool = True, 166 normalizer: type[NameNormalizerBase] | None = None, 167 expected_keys: list[str] | None = None, 168 ) -> None: 169 """Initialize the dictionary with the given data. 170 171 Args: 172 from_dict: The dictionary to initialize the StreamRecord with. 173 prune_extra_fields: If `True`, unexpected fields will be removed. 174 normalize_keys: If `True`, the keys will be normalized using the given normalizer. 175 normalizer: The normalizer to use when normalizing keys. If not provided, the 176 LowerCaseNormalizer will be used. 177 expected_keys: If provided and `prune_extra_fields` is True, then unexpected fields 178 will be removed. This option is ignored if `expected_keys` is not provided. 179 """ 180 # If no normalizer is provided, use LowerCaseNormalizer. 181 self._normalize_keys = normalize_keys 182 self._normalizer: type[NameNormalizerBase] = normalizer or LowerCaseNormalizer 183 184 # If no expected keys are provided, use all keys from the input dictionary. 185 if not expected_keys: 186 expected_keys = list(from_dict.keys()) 187 prune_extra_fields = False # No expected keys provided. 188 else: 189 expected_keys = list(expected_keys) 190 191 for internal_col in AB_INTERNAL_COLUMNS: 192 if internal_col not in expected_keys: 193 expected_keys.append(internal_col) 194 195 # Store a lookup from normalized keys to pretty cased (originally cased) keys. 196 self._pretty_case_keys: dict[str, str] = { 197 self._normalizer.normalize(pretty_case.lower()): pretty_case 198 for pretty_case in expected_keys 199 } 200 201 if normalize_keys: 202 index_keys = [self._normalizer.normalize(key) for key in expected_keys] 203 else: 204 index_keys = expected_keys 205 206 self.update(dict.fromkeys(index_keys)) # Start by initializing all values to None 207 for k, v in from_dict.items(): 208 index_cased_key = self._index_case(k) 209 if prune_extra_fields and index_cased_key not in index_keys: 210 # Dropping undeclared field 211 continue 212 213 self[index_cased_key] = v 214 215 def __getitem__(self, key: str) -> Any: # noqa: ANN401 216 if super().__contains__(key): 217 return super().__getitem__(key) 218 219 if super().__contains__(self._index_case(key)): 220 return super().__getitem__(self._index_case(key)) 221 222 raise KeyError(key) 223 224 def __setitem__(self, key: str, value: Any) -> None: # noqa: ANN401 225 if super().__contains__(key): 226 super().__setitem__(key, value) 227 return 228 229 if super().__contains__(self._index_case(key)): 230 super().__setitem__(self._index_case(key), value) 231 return 232 233 # Store the pretty cased (originally cased) key: 234 self._pretty_case_keys[self._normalizer.normalize(key)] = key 235 236 # Store the data with the normalized key: 237 super().__setitem__(self._index_case(key), value) 238 239 def __delitem__(self, key: str) -> None: 240 if super().__contains__(key): 241 super().__delitem__(key) 242 return 243 244 if super().__contains__(self._index_case(key)): 245 super().__delitem__(self._index_case(key)) 246 return 247 248 raise KeyError(key) 249 250 def __contains__(self, key: object) -> bool: 251 assert isinstance(key, str), "Key must be a string." 252 return super().__contains__(key) or super().__contains__(self._index_case(key)) 253 254 def __iter__(self) -> Any: # noqa: ANN401 255 return iter(super().__iter__()) 256 257 def __len__(self) -> int: 258 return super().__len__() 259 260 def __eq__(self, other: object) -> bool: 261 if isinstance(other, StreamRecord): 262 return dict(self) == dict(other) 263 264 if isinstance(other, dict): 265 return {k.lower(): v for k, v in self.items()} == { 266 k.lower(): v for k, v in other.items() 267 } 268 return False 269 270 def __hash__(self) -> int: # type: ignore [override] # Doesn't match superclass (dict) 271 """Return the hash of the dictionary with keys sorted.""" 272 items = [(k, v) for k, v in self.items() if not isinstance(v, dict)] 273 return hash(tuple(sorted(items)))
The StreamRecord class is a case-aware, case-insensitive dictionary implementation.
It has these behaviors:
- When a key is retrieved, deleted, or checked for existence, it is always checked in a case-insensitive manner.
- The original case is stored in a separate dictionary, so that the original case can be retrieved when needed.
- Because it is subclassed from
dict
, theStreamRecord
class can be passed as a normal Python dictionary. - In addition to the properties of the stream's records, the dictionary also stores the Airbyte
metadata columns:
_airbyte_raw_id
,_airbyte_extracted_at
, and_airbyte_meta
.
This behavior mirrors how a case-aware, case-insensitive SQL database would handle column references.
There are two ways this class can store keys internally:
- If normalize_keys is True, the keys are normalized using the given normalizer.
- If normalize_keys is False, the original case of the keys is stored.
In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the dictionary will be initialized with the given keys. If a key is not found in the input data, it will be initialized with a value of None. When provided, the 'expected_keys' input will also determine the original case of the keys.
134 @classmethod 135 def from_record_message( 136 cls, 137 record_message: AirbyteRecordMessage, 138 *, 139 prune_extra_fields: bool, 140 normalize_keys: bool = True, 141 normalizer: type[NameNormalizerBase] | None = None, 142 expected_keys: list[str] | None = None, 143 ) -> StreamRecord: 144 """Return a StreamRecord from a RecordMessage.""" 145 data_dict: dict[str, Any] = record_message.data.copy() 146 data_dict[AB_RAW_ID_COLUMN] = str(ulid.ULID()) 147 data_dict[AB_EXTRACTED_AT_COLUMN] = datetime.fromtimestamp( 148 record_message.emitted_at / 1000, tz=pytz.utc 149 ) 150 data_dict[AB_META_COLUMN] = {} 151 152 return cls( 153 from_dict=data_dict, 154 prune_extra_fields=prune_extra_fields, 155 normalize_keys=normalize_keys, 156 normalizer=normalizer, 157 expected_keys=expected_keys, 158 )
Return a StreamRecord from a RecordMessage.
Inherited Members
- builtins.dict
- get
- setdefault
- pop
- popitem
- keys
- items
- values
- update
- fromkeys
- clear
- copy