airbyte.records

PyAirbyte Records module.

Understanding record handling in PyAirbyte

PyAirbyte models record handling after Airbyte's "Destination V2" ("Dv2") record handling. This includes the below implementation details.

Field Name Normalization

  1. PyAirbyte normalizes top-level record keys to lowercase, replacing spaces and hyphens with underscores.
  2. PyAirbyte does not normalize nested keys on sub-properties.

For example, the following record:

{

    "My-Field": "value",
    "Nested": {
        "MySubField": "value"
    }
}

Would be normalized to:

{
    "my_field": "value",
    "nested": {
        "MySubField": "value"
    }
}

Table Name Normalization

Similar to column handling, PyAirbyte normalizes table names to the lowercase version of the stream name and may remove or normalize special characters.

Airbyte-Managed Metadata Columns

PyAirbyte adds the following columns to every record:

  • ab_raw_id: A unique identifier for the record.
  • ab_extracted_at: The time the record was extracted.
  • ab_meta: A dictionary of extra metadata about the record.

The names of these columns are included in the airbyte.constants module for programmatic reference.

Schema Evolution

PyAirbyte supports a very basic form of schema evolution:

  1. Columns are always auto-added to cache tables whenever newly arriving properties are detected as not present in the cache table.
  2. Column types will not be modified or expanded to fit changed types in the source catalog.
    • If column types change, we recommend user to manually alter the column types.
  3. At any time, users can run a full sync with a WriteStrategy of 'replace'. This will create a fresh table from scratch and then swap the old and new tables after table sync is complete.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""PyAirbyte Records module.
  3
  4## Understanding record handling in PyAirbyte
  5
  6PyAirbyte models record handling after Airbyte's "Destination V2" ("Dv2") record handling. This
  7includes the below implementation details.
  8
  9### Field Name Normalization
 10
 111. PyAirbyte normalizes top-level record keys to lowercase, replacing spaces and hyphens with
 12   underscores.
 132. PyAirbyte does not normalize nested keys on sub-properties.
 14
 15For example, the following record:
 16
 17```json
 18{
 19
 20    "My-Field": "value",
 21    "Nested": {
 22        "MySubField": "value"
 23    }
 24}
 25```
 26
 27Would be normalized to:
 28
 29```json
 30{
 31    "my_field": "value",
 32    "nested": {
 33        "MySubField": "value"
 34    }
 35}
 36```
 37
 38### Table Name Normalization
 39
 40Similar to column handling, PyAirbyte normalizes table names to the lowercase version of the stream
 41name and may remove or normalize special characters.
 42
 43### Airbyte-Managed Metadata Columns
 44
 45PyAirbyte adds the following columns to every record:
 46
 47- `ab_raw_id`: A unique identifier for the record.
 48- `ab_extracted_at`: The time the record was extracted.
 49- `ab_meta`: A dictionary of extra metadata about the record.
 50
 51The names of these columns are included in the `airbyte.constants` module for programmatic
 52reference.
 53
 54## Schema Evolution
 55
 56PyAirbyte supports a very basic form of schema evolution:
 57
 581. Columns are always auto-added to cache tables whenever newly arriving properties are detected
 59   as not present in the cache table.
 602. Column types will not be modified or expanded to fit changed types in the source catalog.
 61   - If column types change, we recommend user to manually alter the column types.
 623. At any time, users can run a full sync with a `WriteStrategy` of 'replace'. This will create a
 63   fresh table from scratch and then swap the old and new tables after table sync is complete.
 64
 65---
 66
 67"""
 68
 69from __future__ import annotations
 70
 71from datetime import datetime
 72from typing import TYPE_CHECKING, Any
 73
 74import pytz
 75import ulid
 76
 77from airbyte._util.name_normalizers import LowerCaseNormalizer, NameNormalizerBase
 78from airbyte.constants import (
 79    AB_EXTRACTED_AT_COLUMN,
 80    AB_INTERNAL_COLUMNS,
 81    AB_META_COLUMN,
 82    AB_RAW_ID_COLUMN,
 83)
 84
 85
 86if TYPE_CHECKING:
 87    from airbyte_protocol.models import (
 88        AirbyteRecordMessage,
 89    )
 90
 91
 92class StreamRecord(dict[str, Any]):
 93    """The StreamRecord class is a case-aware, case-insensitive dictionary implementation.
 94
 95    It has these behaviors:
 96    - When a key is retrieved, deleted, or checked for existence, it is always checked in a
 97      case-insensitive manner.
 98    - The original case is stored in a separate dictionary, so that the original case can be
 99      retrieved when needed.
100    - Because it is subclassed from `dict`, the `StreamRecord` class can be passed as a normal
101      Python dictionary.
102    - In addition to the properties of the stream's records, the dictionary also stores the Airbyte
103      metadata columns: `_airbyte_raw_id`, `_airbyte_extracted_at`, and `_airbyte_meta`.
104
105    This behavior mirrors how a case-aware, case-insensitive SQL database would handle column
106    references.
107
108    There are two ways this class can store keys internally:
109    - If normalize_keys is True, the keys are normalized using the given normalizer.
110    - If normalize_keys is False, the original case of the keys is stored.
111
112    In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the
113    dictionary will be initialized with the given keys. If a key is not found in the input data, it
114    will be initialized with a value of None. When provided, the 'expected_keys' input will also
115    determine the original case of the keys.
116    """
117
118    def _display_case(self, key: str) -> str:
119        """Return the original case of the key."""
120        return self._pretty_case_keys[self._normalizer.normalize(key)]
121
122    def _index_case(self, key: str) -> str:
123        """Return the internal case of the key.
124
125        If normalize_keys is True, return the normalized key.
126        Otherwise, return the original case of the key.
127        """
128        if self._normalize_keys:
129            return self._normalizer.normalize(key)
130
131        return self._display_case(key)
132
133    @classmethod
134    def from_record_message(
135        cls,
136        record_message: AirbyteRecordMessage,
137        *,
138        prune_extra_fields: bool,
139        normalize_keys: bool = True,
140        normalizer: type[NameNormalizerBase] | None = None,
141        expected_keys: list[str] | None = None,
142    ) -> StreamRecord:
143        """Return a StreamRecord from a RecordMessage."""
144        data_dict: dict[str, Any] = record_message.data.copy()
145        data_dict[AB_RAW_ID_COLUMN] = str(ulid.ULID())
146        data_dict[AB_EXTRACTED_AT_COLUMN] = datetime.fromtimestamp(
147            record_message.emitted_at / 1000, tz=pytz.utc
148        )
149        data_dict[AB_META_COLUMN] = {}
150
151        return cls(
152            from_dict=data_dict,
153            prune_extra_fields=prune_extra_fields,
154            normalize_keys=normalize_keys,
155            normalizer=normalizer,
156            expected_keys=expected_keys,
157        )
158
159    def __init__(
160        self,
161        from_dict: dict,
162        *,
163        prune_extra_fields: bool,
164        normalize_keys: bool = True,
165        normalizer: type[NameNormalizerBase] | None = None,
166        expected_keys: list[str] | None = None,
167    ) -> None:
168        """Initialize the dictionary with the given data.
169
170        Args:
171            from_dict: The dictionary to initialize the StreamRecord with.
172            prune_extra_fields: If `True`, unexpected fields will be removed.
173            normalize_keys: If `True`, the keys will be normalized using the given normalizer.
174            normalizer: The normalizer to use when normalizing keys. If not provided, the
175                LowerCaseNormalizer will be used.
176            expected_keys: If provided and `prune_extra_fields` is True, then unexpected fields
177                will be removed. This option is ignored if `expected_keys` is not provided.
178        """
179        # If no normalizer is provided, use LowerCaseNormalizer.
180        self._normalize_keys = normalize_keys
181        self._normalizer: type[NameNormalizerBase] = normalizer or LowerCaseNormalizer
182
183        # If no expected keys are provided, use all keys from the input dictionary.
184        if not expected_keys:
185            expected_keys = list(from_dict.keys())
186            prune_extra_fields = False  # No expected keys provided.
187        else:
188            expected_keys = list(expected_keys)
189
190        for internal_col in AB_INTERNAL_COLUMNS:
191            if internal_col not in expected_keys:
192                expected_keys.append(internal_col)
193
194        # Store a lookup from normalized keys to pretty cased (originally cased) keys.
195        self._pretty_case_keys: dict[str, str] = {
196            self._normalizer.normalize(pretty_case.lower()): pretty_case
197            for pretty_case in expected_keys
198        }
199
200        if normalize_keys:
201            index_keys = [self._normalizer.normalize(key) for key in expected_keys]
202        else:
203            index_keys = expected_keys
204
205        self.update(dict.fromkeys(index_keys))  # Start by initializing all values to None
206        for k, v in from_dict.items():
207            index_cased_key = self._index_case(k)
208            if prune_extra_fields and index_cased_key not in index_keys:
209                # Dropping undeclared field
210                continue
211
212            self[index_cased_key] = v
213
214    def __getitem__(self, key: str) -> Any:  # noqa: ANN401
215        if super().__contains__(key):
216            return super().__getitem__(key)
217
218        if super().__contains__(self._index_case(key)):
219            return super().__getitem__(self._index_case(key))
220
221        raise KeyError(key)
222
223    def __setitem__(self, key: str, value: Any) -> None:  # noqa: ANN401
224        if super().__contains__(key):
225            super().__setitem__(key, value)
226            return
227
228        if super().__contains__(self._index_case(key)):
229            super().__setitem__(self._index_case(key), value)
230            return
231
232        # Store the pretty cased (originally cased) key:
233        self._pretty_case_keys[self._normalizer.normalize(key)] = key
234
235        # Store the data with the normalized key:
236        super().__setitem__(self._index_case(key), value)
237
238    def __delitem__(self, key: str) -> None:
239        if super().__contains__(key):
240            super().__delitem__(key)
241            return
242
243        if super().__contains__(self._index_case(key)):
244            super().__delitem__(self._index_case(key))
245            return
246
247        raise KeyError(key)
248
249    def __contains__(self, key: object) -> bool:
250        assert isinstance(key, str), "Key must be a string."
251        return super().__contains__(key) or super().__contains__(self._index_case(key))
252
253    def __iter__(self) -> Any:  # noqa: ANN401
254        return iter(super().__iter__())
255
256    def __len__(self) -> int:
257        return super().__len__()
258
259    def __eq__(self, other: object) -> bool:
260        if isinstance(other, StreamRecord):
261            return dict(self) == dict(other)
262
263        if isinstance(other, dict):
264            return {k.lower(): v for k, v in self.items()} == {
265                k.lower(): v for k, v in other.items()
266            }
267        return False
268
269    def __hash__(self) -> int:  # type: ignore [override]  # Doesn't match superclass (dict)
270        """Return the hash of the dictionary with keys sorted."""
271        items = [(k, v) for k, v in self.items() if not isinstance(v, dict)]
272        return hash(tuple(sorted(items)))
class StreamRecord(dict[str, typing.Any]):
 93class StreamRecord(dict[str, Any]):
 94    """The StreamRecord class is a case-aware, case-insensitive dictionary implementation.
 95
 96    It has these behaviors:
 97    - When a key is retrieved, deleted, or checked for existence, it is always checked in a
 98      case-insensitive manner.
 99    - The original case is stored in a separate dictionary, so that the original case can be
100      retrieved when needed.
101    - Because it is subclassed from `dict`, the `StreamRecord` class can be passed as a normal
102      Python dictionary.
103    - In addition to the properties of the stream's records, the dictionary also stores the Airbyte
104      metadata columns: `_airbyte_raw_id`, `_airbyte_extracted_at`, and `_airbyte_meta`.
105
106    This behavior mirrors how a case-aware, case-insensitive SQL database would handle column
107    references.
108
109    There are two ways this class can store keys internally:
110    - If normalize_keys is True, the keys are normalized using the given normalizer.
111    - If normalize_keys is False, the original case of the keys is stored.
112
113    In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the
114    dictionary will be initialized with the given keys. If a key is not found in the input data, it
115    will be initialized with a value of None. When provided, the 'expected_keys' input will also
116    determine the original case of the keys.
117    """
118
119    def _display_case(self, key: str) -> str:
120        """Return the original case of the key."""
121        return self._pretty_case_keys[self._normalizer.normalize(key)]
122
123    def _index_case(self, key: str) -> str:
124        """Return the internal case of the key.
125
126        If normalize_keys is True, return the normalized key.
127        Otherwise, return the original case of the key.
128        """
129        if self._normalize_keys:
130            return self._normalizer.normalize(key)
131
132        return self._display_case(key)
133
134    @classmethod
135    def from_record_message(
136        cls,
137        record_message: AirbyteRecordMessage,
138        *,
139        prune_extra_fields: bool,
140        normalize_keys: bool = True,
141        normalizer: type[NameNormalizerBase] | None = None,
142        expected_keys: list[str] | None = None,
143    ) -> StreamRecord:
144        """Return a StreamRecord from a RecordMessage."""
145        data_dict: dict[str, Any] = record_message.data.copy()
146        data_dict[AB_RAW_ID_COLUMN] = str(ulid.ULID())
147        data_dict[AB_EXTRACTED_AT_COLUMN] = datetime.fromtimestamp(
148            record_message.emitted_at / 1000, tz=pytz.utc
149        )
150        data_dict[AB_META_COLUMN] = {}
151
152        return cls(
153            from_dict=data_dict,
154            prune_extra_fields=prune_extra_fields,
155            normalize_keys=normalize_keys,
156            normalizer=normalizer,
157            expected_keys=expected_keys,
158        )
159
160    def __init__(
161        self,
162        from_dict: dict,
163        *,
164        prune_extra_fields: bool,
165        normalize_keys: bool = True,
166        normalizer: type[NameNormalizerBase] | None = None,
167        expected_keys: list[str] | None = None,
168    ) -> None:
169        """Initialize the dictionary with the given data.
170
171        Args:
172            from_dict: The dictionary to initialize the StreamRecord with.
173            prune_extra_fields: If `True`, unexpected fields will be removed.
174            normalize_keys: If `True`, the keys will be normalized using the given normalizer.
175            normalizer: The normalizer to use when normalizing keys. If not provided, the
176                LowerCaseNormalizer will be used.
177            expected_keys: If provided and `prune_extra_fields` is True, then unexpected fields
178                will be removed. This option is ignored if `expected_keys` is not provided.
179        """
180        # If no normalizer is provided, use LowerCaseNormalizer.
181        self._normalize_keys = normalize_keys
182        self._normalizer: type[NameNormalizerBase] = normalizer or LowerCaseNormalizer
183
184        # If no expected keys are provided, use all keys from the input dictionary.
185        if not expected_keys:
186            expected_keys = list(from_dict.keys())
187            prune_extra_fields = False  # No expected keys provided.
188        else:
189            expected_keys = list(expected_keys)
190
191        for internal_col in AB_INTERNAL_COLUMNS:
192            if internal_col not in expected_keys:
193                expected_keys.append(internal_col)
194
195        # Store a lookup from normalized keys to pretty cased (originally cased) keys.
196        self._pretty_case_keys: dict[str, str] = {
197            self._normalizer.normalize(pretty_case.lower()): pretty_case
198            for pretty_case in expected_keys
199        }
200
201        if normalize_keys:
202            index_keys = [self._normalizer.normalize(key) for key in expected_keys]
203        else:
204            index_keys = expected_keys
205
206        self.update(dict.fromkeys(index_keys))  # Start by initializing all values to None
207        for k, v in from_dict.items():
208            index_cased_key = self._index_case(k)
209            if prune_extra_fields and index_cased_key not in index_keys:
210                # Dropping undeclared field
211                continue
212
213            self[index_cased_key] = v
214
215    def __getitem__(self, key: str) -> Any:  # noqa: ANN401
216        if super().__contains__(key):
217            return super().__getitem__(key)
218
219        if super().__contains__(self._index_case(key)):
220            return super().__getitem__(self._index_case(key))
221
222        raise KeyError(key)
223
224    def __setitem__(self, key: str, value: Any) -> None:  # noqa: ANN401
225        if super().__contains__(key):
226            super().__setitem__(key, value)
227            return
228
229        if super().__contains__(self._index_case(key)):
230            super().__setitem__(self._index_case(key), value)
231            return
232
233        # Store the pretty cased (originally cased) key:
234        self._pretty_case_keys[self._normalizer.normalize(key)] = key
235
236        # Store the data with the normalized key:
237        super().__setitem__(self._index_case(key), value)
238
239    def __delitem__(self, key: str) -> None:
240        if super().__contains__(key):
241            super().__delitem__(key)
242            return
243
244        if super().__contains__(self._index_case(key)):
245            super().__delitem__(self._index_case(key))
246            return
247
248        raise KeyError(key)
249
250    def __contains__(self, key: object) -> bool:
251        assert isinstance(key, str), "Key must be a string."
252        return super().__contains__(key) or super().__contains__(self._index_case(key))
253
254    def __iter__(self) -> Any:  # noqa: ANN401
255        return iter(super().__iter__())
256
257    def __len__(self) -> int:
258        return super().__len__()
259
260    def __eq__(self, other: object) -> bool:
261        if isinstance(other, StreamRecord):
262            return dict(self) == dict(other)
263
264        if isinstance(other, dict):
265            return {k.lower(): v for k, v in self.items()} == {
266                k.lower(): v for k, v in other.items()
267            }
268        return False
269
270    def __hash__(self) -> int:  # type: ignore [override]  # Doesn't match superclass (dict)
271        """Return the hash of the dictionary with keys sorted."""
272        items = [(k, v) for k, v in self.items() if not isinstance(v, dict)]
273        return hash(tuple(sorted(items)))

The StreamRecord class is a case-aware, case-insensitive dictionary implementation.

It has these behaviors:

  • When a key is retrieved, deleted, or checked for existence, it is always checked in a case-insensitive manner.
  • The original case is stored in a separate dictionary, so that the original case can be retrieved when needed.
  • Because it is subclassed from dict, the StreamRecord class can be passed as a normal Python dictionary.
  • In addition to the properties of the stream's records, the dictionary also stores the Airbyte metadata columns: _airbyte_raw_id, _airbyte_extracted_at, and _airbyte_meta.

This behavior mirrors how a case-aware, case-insensitive SQL database would handle column references.

There are two ways this class can store keys internally:

  • If normalize_keys is True, the keys are normalized using the given normalizer.
  • If normalize_keys is False, the original case of the keys is stored.

In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the dictionary will be initialized with the given keys. If a key is not found in the input data, it will be initialized with a value of None. When provided, the 'expected_keys' input will also determine the original case of the keys.

@classmethod
def from_record_message( cls, record_message: airbyte_protocol.models.airbyte_protocol.AirbyteRecordMessage, *, prune_extra_fields: bool, normalize_keys: bool = True, normalizer: type[airbyte._util.name_normalizers.NameNormalizerBase] | None = None, expected_keys: list[str] | None = None) -> StreamRecord:
134    @classmethod
135    def from_record_message(
136        cls,
137        record_message: AirbyteRecordMessage,
138        *,
139        prune_extra_fields: bool,
140        normalize_keys: bool = True,
141        normalizer: type[NameNormalizerBase] | None = None,
142        expected_keys: list[str] | None = None,
143    ) -> StreamRecord:
144        """Return a StreamRecord from a RecordMessage."""
145        data_dict: dict[str, Any] = record_message.data.copy()
146        data_dict[AB_RAW_ID_COLUMN] = str(ulid.ULID())
147        data_dict[AB_EXTRACTED_AT_COLUMN] = datetime.fromtimestamp(
148            record_message.emitted_at / 1000, tz=pytz.utc
149        )
150        data_dict[AB_META_COLUMN] = {}
151
152        return cls(
153            from_dict=data_dict,
154            prune_extra_fields=prune_extra_fields,
155            normalize_keys=normalize_keys,
156            normalizer=normalizer,
157            expected_keys=expected_keys,
158        )

Return a StreamRecord from a RecordMessage.

Inherited Members
builtins.dict
get
setdefault
pop
popitem
keys
items
values
update
fromkeys
clear
copy