airbyte.records

PyAirbyte Records module.

Understanding record handling in PyAirbyte

PyAirbyte models record handling after Airbyte's "Destination V2" ("Dv2") record handling. This includes the below implementation details.

Field Name Normalization

  1. PyAirbyte normalizes top-level record keys to lowercase, replacing spaces and hyphens with underscores.
  2. PyAirbyte does not normalize nested keys on sub-properties.

For example, the following record:

{

    "My-Field": "value",
    "Nested": {
        "MySubField": "value"
    }
}

Would be normalized to:

{
    "my_field": "value",
    "nested": {
        "MySubField": "value"
    }
}

Table Name Normalization

Similar to column handling, PyAirbyte normalizes table names to the lowercase version of the stream name and may remove or normalize special characters.

Airbyte-Managed Metadata Columns

PyAirbyte adds the following columns to every record:

  • ab_raw_id: A unique identifier for the record.
  • ab_extracted_at: The time the record was extracted.
  • ab_meta: A dictionary of extra metadata about the record.

The names of these columns are included in the airbyte.constants module for programmatic reference.

Schema Evolution

PyAirbyte supports a very basic form of schema evolution:

  1. Columns are always auto-added to cache tables whenever newly arriving properties are detected as not present in the cache table.
  2. Column types will not be modified or expanded to fit changed types in the source catalog.
    • If column types change, we recommend user to manually alter the column types.
  3. At any time, users can run a full sync with a WriteStrategy of 'replace'. This will create a fresh table from scratch and then swap the old and new tables after table sync is complete.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""PyAirbyte Records module.
  3
  4## Understanding record handling in PyAirbyte
  5
  6PyAirbyte models record handling after Airbyte's "Destination V2" ("Dv2") record handling. This
  7includes the below implementation details.
  8
  9### Field Name Normalization
 10
 111. PyAirbyte normalizes top-level record keys to lowercase, replacing spaces and hyphens with
 12   underscores.
 132. PyAirbyte does not normalize nested keys on sub-properties.
 14
 15For example, the following record:
 16
 17```json
 18{
 19
 20    "My-Field": "value",
 21    "Nested": {
 22        "MySubField": "value"
 23    }
 24}
 25```
 26
 27Would be normalized to:
 28
 29```json
 30{
 31    "my_field": "value",
 32    "nested": {
 33        "MySubField": "value"
 34    }
 35}
 36```
 37
 38### Table Name Normalization
 39
 40Similar to column handling, PyAirbyte normalizes table names to the lowercase version of the stream
 41name and may remove or normalize special characters.
 42
 43### Airbyte-Managed Metadata Columns
 44
 45PyAirbyte adds the following columns to every record:
 46
 47- `ab_raw_id`: A unique identifier for the record.
 48- `ab_extracted_at`: The time the record was extracted.
 49- `ab_meta`: A dictionary of extra metadata about the record.
 50
 51The names of these columns are included in the `airbyte.constants` module for programmatic
 52reference.
 53
 54## Schema Evolution
 55
 56PyAirbyte supports a very basic form of schema evolution:
 57
 581. Columns are always auto-added to cache tables whenever newly arriving properties are detected
 59   as not present in the cache table.
 602. Column types will not be modified or expanded to fit changed types in the source catalog.
 61   - If column types change, we recommend user to manually alter the column types.
 623. At any time, users can run a full sync with a `WriteStrategy` of 'replace'. This will create a
 63   fresh table from scratch and then swap the old and new tables after table sync is complete.
 64
 65---
 66
 67"""
 68
 69from __future__ import annotations
 70
 71from datetime import datetime
 72from typing import TYPE_CHECKING, Any
 73
 74import pytz
 75from uuid_extensions import uuid7str
 76
 77from airbyte._util.name_normalizers import LowerCaseNormalizer, NameNormalizerBase
 78from airbyte.constants import (
 79    AB_EXTRACTED_AT_COLUMN,
 80    AB_INTERNAL_COLUMNS,
 81    AB_META_COLUMN,
 82    AB_RAW_ID_COLUMN,
 83)
 84
 85
 86if TYPE_CHECKING:
 87    from collections.abc import Iterator
 88
 89    from airbyte_protocol.models import (
 90        AirbyteRecordMessage,
 91    )
 92
 93
 94class StreamRecordHandler:
 95    """A class to handle processing of StreamRecords.
 96
 97    This is a long-lived object that can be used to process multiple StreamRecords, which
 98    saves on memory and processing time by reusing the same object for all records of the same type.
 99    """
100
101    def __init__(
102        self,
103        *,
104        json_schema: dict,
105        normalizer: type[NameNormalizerBase] = LowerCaseNormalizer,
106        normalize_keys: bool = False,
107        prune_extra_fields: bool,
108    ) -> None:
109        """Initialize the dictionary with the given data.
110
111        Args:
112            json_schema: The JSON Schema definition for this stream.
113            normalizer: The normalizer to use when normalizing keys. If not provided, the
114                LowerCaseNormalizer will be used.
115            normalize_keys: If `True`, the keys will be normalized using the given normalizer.
116            prune_extra_fields: If `True`, unexpected fields will be removed.
117        """
118        self._expected_keys: list[str] = list(json_schema.get("properties", {}).keys())
119        self._normalizer: type[NameNormalizerBase] = normalizer
120        self._normalize_keys: bool = normalize_keys
121        self.prune_extra_fields: bool = prune_extra_fields
122
123        self.index_keys: list[str] = [
124            self._normalizer.normalize(key) if self._normalize_keys else key
125            for key in self._expected_keys
126        ]
127        self.normalized_keys: list[str] = [
128            self._normalizer.normalize(key) for key in self._expected_keys
129        ]
130        self.quick_lookup: dict[str, str]
131
132        for internal_col in AB_INTERNAL_COLUMNS:
133            if internal_col not in self._expected_keys:
134                self._expected_keys.append(internal_col)
135
136        # Store a lookup from normalized keys to pretty cased (originally cased) keys.
137        self._pretty_case_lookup: dict[str, str] = {
138            self._normalizer.normalize(pretty_case.lower()): pretty_case
139            for pretty_case in self._expected_keys
140        }
141        # Store a map from all key versions (normalized and pretty-cased) to their normalized
142        # version.
143        self.quick_lookup = {
144            key: self._normalizer.normalize(key)
145            if self._normalize_keys
146            else self.to_display_case(key)
147            for key in set(self._expected_keys) | set(self._pretty_case_lookup.values())
148        }
149
150    def to_display_case(self, key: str) -> str:
151        """Return the original case of the key.
152
153        If the key is not found in the pretty case lookup, return the key provided.
154        """
155        return self._pretty_case_lookup.get(self._normalizer.normalize(key), key)
156
157    def to_index_case(self, key: str) -> str:
158        """Return the internal case of the key.
159
160        If `normalize_keys` is True, returns the normalized key.
161        Otherwise, return the original case ("pretty case") of the key.
162        """
163        try:
164            return self.quick_lookup[key]
165        except KeyError:
166            result = (
167                self._normalizer.normalize(key)
168                if self._normalize_keys
169                else self.to_display_case(key)
170            )
171            self.quick_lookup[key] = result
172            return result
173
174
175class StreamRecord(dict[str, Any]):
176    """The StreamRecord class is a case-aware, case-insensitive dictionary implementation.
177
178    It has these behaviors:
179    - When a key is retrieved, deleted, or checked for existence, it is always checked in a
180      case-insensitive manner.
181    - The original case is stored in a separate dictionary, so that the original case can be
182      retrieved when needed.
183    - Because it is subclassed from `dict`, the `StreamRecord` class can be passed as a normal
184      Python dictionary.
185    - In addition to the properties of the stream's records, the dictionary also stores the Airbyte
186      metadata columns: `_airbyte_raw_id`, `_airbyte_extracted_at`, and `_airbyte_meta`.
187
188    This behavior mirrors how a case-aware, case-insensitive SQL database would handle column
189    references.
190
191    There are two ways this class can store keys internally:
192    - If normalize_keys is True, the keys are normalized using the given normalizer.
193    - If normalize_keys is False, the original case of the keys is stored.
194
195    In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the
196    dictionary will be initialized with the given keys. If a key is not found in the input data, it
197    will be initialized with a value of None. When provided, the 'expected_keys' input will also
198    determine the original case of the keys.
199    """
200
201    def __init__(
202        self,
203        from_dict: dict,
204        *,
205        stream_record_handler: StreamRecordHandler,
206        with_internal_columns: bool = True,
207        extracted_at: datetime | None = None,
208    ) -> None:
209        """Initialize the dictionary with the given data.
210
211        Args:
212            from_dict: The dictionary to initialize the StreamRecord with.
213            stream_record_handler: The StreamRecordHandler to use for processing the record.
214            with_internal_columns: If `True`, the internal columns will be added to the record.
215            extracted_at: The time the record was extracted. If not provided, the current time will
216                be used.
217        """
218        self._stream_handler: StreamRecordHandler = stream_record_handler
219
220        # Start by initializing all values to None
221        self.update(dict.fromkeys(stream_record_handler.index_keys))
222
223        # Update the dictionary with the given data
224        if self._stream_handler.prune_extra_fields:
225            self.update(
226                {
227                    self._stream_handler.to_index_case(k): v
228                    for k, v in from_dict.items()
229                    if self._stream_handler.to_index_case(k) in self._stream_handler.index_keys
230                }
231            )
232        else:
233            self.update({self._stream_handler.to_index_case(k): v for k, v in from_dict.items()})
234
235        if with_internal_columns:
236            self.update(
237                {
238                    AB_RAW_ID_COLUMN: uuid7str(),
239                    AB_EXTRACTED_AT_COLUMN: extracted_at or datetime.now(pytz.utc),
240                    AB_META_COLUMN: {},
241                }
242            )
243
244    @classmethod
245    def from_record_message(
246        cls,
247        record_message: AirbyteRecordMessage,
248        *,
249        stream_record_handler: StreamRecordHandler,
250    ) -> StreamRecord:
251        """Return a StreamRecord from a RecordMessage."""
252        data_dict: dict[str, Any] = record_message.data.copy()
253        return cls(
254            from_dict=data_dict,
255            stream_record_handler=stream_record_handler,
256            with_internal_columns=True,
257            extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=pytz.utc),
258        )
259
260    def __getitem__(self, key: str) -> Any:  # noqa: ANN401
261        """Return the item with the given key."""
262        try:
263            return super().__getitem__(key)
264        except KeyError:
265            return super().__getitem__(self._stream_handler.to_index_case(key))
266
267    def __setitem__(self, key: str, value: Any) -> None:  # noqa: ANN401
268        """Set the item with the given key to the given value."""
269        index_case_key = self._stream_handler.to_index_case(key)
270        if (
271            self._stream_handler.prune_extra_fields
272            and index_case_key not in self._stream_handler.index_keys
273        ):
274            return
275
276        super().__setitem__(index_case_key, value)
277
278    def __delitem__(self, key: str) -> None:
279        """Delete the item with the given key."""
280        try:
281            super().__delitem__(key)
282        except KeyError:
283            index_case_key = self._stream_handler.to_index_case(key)
284            if super().__contains__(index_case_key):
285                super().__delitem__(index_case_key)
286                return
287        else:
288            # No failure. Key was deleted.
289            return
290
291        raise KeyError(key)
292
293    def __contains__(self, key: object) -> bool:
294        """Return whether the dictionary contains the given key."""
295        assert isinstance(key, str), "Key must be a string."
296        return super().__contains__(key) or super().__contains__(
297            self._stream_handler.to_index_case(key)
298        )
299
300    def __iter__(self) -> Iterator[str]:
301        """Return an iterator over the keys of the dictionary."""
302        return iter(super().__iter__())
303
304    def __len__(self) -> int:
305        """Return the number of items in the dictionary."""
306        return super().__len__()
307
308    def __eq__(self, other: object) -> bool:
309        """Return whether the StreamRecord is equal to the given dict or StreamRecord object."""
310        if isinstance(other, StreamRecord):
311            return dict(self) == dict(other)
312
313        if isinstance(other, dict):
314            return {k.lower(): v for k, v in self.items()} == {
315                k.lower(): v for k, v in other.items()
316            }
317        return False
318
319    def __hash__(self) -> int:  # type: ignore [override]  # Doesn't match superclass (dict)
320        """Return the hash of the dictionary with keys sorted."""
321        items = [(k, v) for k, v in self.items() if not isinstance(v, dict)]
322        return hash(tuple(sorted(items)))
class StreamRecordHandler:
 95class StreamRecordHandler:
 96    """A class to handle processing of StreamRecords.
 97
 98    This is a long-lived object that can be used to process multiple StreamRecords, which
 99    saves on memory and processing time by reusing the same object for all records of the same type.
100    """
101
102    def __init__(
103        self,
104        *,
105        json_schema: dict,
106        normalizer: type[NameNormalizerBase] = LowerCaseNormalizer,
107        normalize_keys: bool = False,
108        prune_extra_fields: bool,
109    ) -> None:
110        """Initialize the dictionary with the given data.
111
112        Args:
113            json_schema: The JSON Schema definition for this stream.
114            normalizer: The normalizer to use when normalizing keys. If not provided, the
115                LowerCaseNormalizer will be used.
116            normalize_keys: If `True`, the keys will be normalized using the given normalizer.
117            prune_extra_fields: If `True`, unexpected fields will be removed.
118        """
119        self._expected_keys: list[str] = list(json_schema.get("properties", {}).keys())
120        self._normalizer: type[NameNormalizerBase] = normalizer
121        self._normalize_keys: bool = normalize_keys
122        self.prune_extra_fields: bool = prune_extra_fields
123
124        self.index_keys: list[str] = [
125            self._normalizer.normalize(key) if self._normalize_keys else key
126            for key in self._expected_keys
127        ]
128        self.normalized_keys: list[str] = [
129            self._normalizer.normalize(key) for key in self._expected_keys
130        ]
131        self.quick_lookup: dict[str, str]
132
133        for internal_col in AB_INTERNAL_COLUMNS:
134            if internal_col not in self._expected_keys:
135                self._expected_keys.append(internal_col)
136
137        # Store a lookup from normalized keys to pretty cased (originally cased) keys.
138        self._pretty_case_lookup: dict[str, str] = {
139            self._normalizer.normalize(pretty_case.lower()): pretty_case
140            for pretty_case in self._expected_keys
141        }
142        # Store a map from all key versions (normalized and pretty-cased) to their normalized
143        # version.
144        self.quick_lookup = {
145            key: self._normalizer.normalize(key)
146            if self._normalize_keys
147            else self.to_display_case(key)
148            for key in set(self._expected_keys) | set(self._pretty_case_lookup.values())
149        }
150
151    def to_display_case(self, key: str) -> str:
152        """Return the original case of the key.
153
154        If the key is not found in the pretty case lookup, return the key provided.
155        """
156        return self._pretty_case_lookup.get(self._normalizer.normalize(key), key)
157
158    def to_index_case(self, key: str) -> str:
159        """Return the internal case of the key.
160
161        If `normalize_keys` is True, returns the normalized key.
162        Otherwise, return the original case ("pretty case") of the key.
163        """
164        try:
165            return self.quick_lookup[key]
166        except KeyError:
167            result = (
168                self._normalizer.normalize(key)
169                if self._normalize_keys
170                else self.to_display_case(key)
171            )
172            self.quick_lookup[key] = result
173            return result

A class to handle processing of StreamRecords.

This is a long-lived object that can be used to process multiple StreamRecords, which saves on memory and processing time by reusing the same object for all records of the same type.

StreamRecordHandler( *, json_schema: dict, normalizer: type[airbyte._util.name_normalizers.NameNormalizerBase] = <class 'airbyte._util.name_normalizers.LowerCaseNormalizer'>, normalize_keys: bool = False, prune_extra_fields: bool)
102    def __init__(
103        self,
104        *,
105        json_schema: dict,
106        normalizer: type[NameNormalizerBase] = LowerCaseNormalizer,
107        normalize_keys: bool = False,
108        prune_extra_fields: bool,
109    ) -> None:
110        """Initialize the dictionary with the given data.
111
112        Args:
113            json_schema: The JSON Schema definition for this stream.
114            normalizer: The normalizer to use when normalizing keys. If not provided, the
115                LowerCaseNormalizer will be used.
116            normalize_keys: If `True`, the keys will be normalized using the given normalizer.
117            prune_extra_fields: If `True`, unexpected fields will be removed.
118        """
119        self._expected_keys: list[str] = list(json_schema.get("properties", {}).keys())
120        self._normalizer: type[NameNormalizerBase] = normalizer
121        self._normalize_keys: bool = normalize_keys
122        self.prune_extra_fields: bool = prune_extra_fields
123
124        self.index_keys: list[str] = [
125            self._normalizer.normalize(key) if self._normalize_keys else key
126            for key in self._expected_keys
127        ]
128        self.normalized_keys: list[str] = [
129            self._normalizer.normalize(key) for key in self._expected_keys
130        ]
131        self.quick_lookup: dict[str, str]
132
133        for internal_col in AB_INTERNAL_COLUMNS:
134            if internal_col not in self._expected_keys:
135                self._expected_keys.append(internal_col)
136
137        # Store a lookup from normalized keys to pretty cased (originally cased) keys.
138        self._pretty_case_lookup: dict[str, str] = {
139            self._normalizer.normalize(pretty_case.lower()): pretty_case
140            for pretty_case in self._expected_keys
141        }
142        # Store a map from all key versions (normalized and pretty-cased) to their normalized
143        # version.
144        self.quick_lookup = {
145            key: self._normalizer.normalize(key)
146            if self._normalize_keys
147            else self.to_display_case(key)
148            for key in set(self._expected_keys) | set(self._pretty_case_lookup.values())
149        }

Initialize the dictionary with the given data.

Arguments:
  • json_schema: The JSON Schema definition for this stream.
  • normalizer: The normalizer to use when normalizing keys. If not provided, the LowerCaseNormalizer will be used.
  • normalize_keys: If True, the keys will be normalized using the given normalizer.
  • prune_extra_fields: If True, unexpected fields will be removed.
prune_extra_fields: bool
index_keys: list[str]
normalized_keys: list[str]
quick_lookup: dict[str, str]
def to_display_case(self, key: str) -> str:
151    def to_display_case(self, key: str) -> str:
152        """Return the original case of the key.
153
154        If the key is not found in the pretty case lookup, return the key provided.
155        """
156        return self._pretty_case_lookup.get(self._normalizer.normalize(key), key)

Return the original case of the key.

If the key is not found in the pretty case lookup, return the key provided.

def to_index_case(self, key: str) -> str:
158    def to_index_case(self, key: str) -> str:
159        """Return the internal case of the key.
160
161        If `normalize_keys` is True, returns the normalized key.
162        Otherwise, return the original case ("pretty case") of the key.
163        """
164        try:
165            return self.quick_lookup[key]
166        except KeyError:
167            result = (
168                self._normalizer.normalize(key)
169                if self._normalize_keys
170                else self.to_display_case(key)
171            )
172            self.quick_lookup[key] = result
173            return result

Return the internal case of the key.

If normalize_keys is True, returns the normalized key. Otherwise, return the original case ("pretty case") of the key.

class StreamRecord(dict[str, typing.Any]):
176class StreamRecord(dict[str, Any]):
177    """The StreamRecord class is a case-aware, case-insensitive dictionary implementation.
178
179    It has these behaviors:
180    - When a key is retrieved, deleted, or checked for existence, it is always checked in a
181      case-insensitive manner.
182    - The original case is stored in a separate dictionary, so that the original case can be
183      retrieved when needed.
184    - Because it is subclassed from `dict`, the `StreamRecord` class can be passed as a normal
185      Python dictionary.
186    - In addition to the properties of the stream's records, the dictionary also stores the Airbyte
187      metadata columns: `_airbyte_raw_id`, `_airbyte_extracted_at`, and `_airbyte_meta`.
188
189    This behavior mirrors how a case-aware, case-insensitive SQL database would handle column
190    references.
191
192    There are two ways this class can store keys internally:
193    - If normalize_keys is True, the keys are normalized using the given normalizer.
194    - If normalize_keys is False, the original case of the keys is stored.
195
196    In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the
197    dictionary will be initialized with the given keys. If a key is not found in the input data, it
198    will be initialized with a value of None. When provided, the 'expected_keys' input will also
199    determine the original case of the keys.
200    """
201
202    def __init__(
203        self,
204        from_dict: dict,
205        *,
206        stream_record_handler: StreamRecordHandler,
207        with_internal_columns: bool = True,
208        extracted_at: datetime | None = None,
209    ) -> None:
210        """Initialize the dictionary with the given data.
211
212        Args:
213            from_dict: The dictionary to initialize the StreamRecord with.
214            stream_record_handler: The StreamRecordHandler to use for processing the record.
215            with_internal_columns: If `True`, the internal columns will be added to the record.
216            extracted_at: The time the record was extracted. If not provided, the current time will
217                be used.
218        """
219        self._stream_handler: StreamRecordHandler = stream_record_handler
220
221        # Start by initializing all values to None
222        self.update(dict.fromkeys(stream_record_handler.index_keys))
223
224        # Update the dictionary with the given data
225        if self._stream_handler.prune_extra_fields:
226            self.update(
227                {
228                    self._stream_handler.to_index_case(k): v
229                    for k, v in from_dict.items()
230                    if self._stream_handler.to_index_case(k) in self._stream_handler.index_keys
231                }
232            )
233        else:
234            self.update({self._stream_handler.to_index_case(k): v for k, v in from_dict.items()})
235
236        if with_internal_columns:
237            self.update(
238                {
239                    AB_RAW_ID_COLUMN: uuid7str(),
240                    AB_EXTRACTED_AT_COLUMN: extracted_at or datetime.now(pytz.utc),
241                    AB_META_COLUMN: {},
242                }
243            )
244
245    @classmethod
246    def from_record_message(
247        cls,
248        record_message: AirbyteRecordMessage,
249        *,
250        stream_record_handler: StreamRecordHandler,
251    ) -> StreamRecord:
252        """Return a StreamRecord from a RecordMessage."""
253        data_dict: dict[str, Any] = record_message.data.copy()
254        return cls(
255            from_dict=data_dict,
256            stream_record_handler=stream_record_handler,
257            with_internal_columns=True,
258            extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=pytz.utc),
259        )
260
261    def __getitem__(self, key: str) -> Any:  # noqa: ANN401
262        """Return the item with the given key."""
263        try:
264            return super().__getitem__(key)
265        except KeyError:
266            return super().__getitem__(self._stream_handler.to_index_case(key))
267
268    def __setitem__(self, key: str, value: Any) -> None:  # noqa: ANN401
269        """Set the item with the given key to the given value."""
270        index_case_key = self._stream_handler.to_index_case(key)
271        if (
272            self._stream_handler.prune_extra_fields
273            and index_case_key not in self._stream_handler.index_keys
274        ):
275            return
276
277        super().__setitem__(index_case_key, value)
278
279    def __delitem__(self, key: str) -> None:
280        """Delete the item with the given key."""
281        try:
282            super().__delitem__(key)
283        except KeyError:
284            index_case_key = self._stream_handler.to_index_case(key)
285            if super().__contains__(index_case_key):
286                super().__delitem__(index_case_key)
287                return
288        else:
289            # No failure. Key was deleted.
290            return
291
292        raise KeyError(key)
293
294    def __contains__(self, key: object) -> bool:
295        """Return whether the dictionary contains the given key."""
296        assert isinstance(key, str), "Key must be a string."
297        return super().__contains__(key) or super().__contains__(
298            self._stream_handler.to_index_case(key)
299        )
300
301    def __iter__(self) -> Iterator[str]:
302        """Return an iterator over the keys of the dictionary."""
303        return iter(super().__iter__())
304
305    def __len__(self) -> int:
306        """Return the number of items in the dictionary."""
307        return super().__len__()
308
309    def __eq__(self, other: object) -> bool:
310        """Return whether the StreamRecord is equal to the given dict or StreamRecord object."""
311        if isinstance(other, StreamRecord):
312            return dict(self) == dict(other)
313
314        if isinstance(other, dict):
315            return {k.lower(): v for k, v in self.items()} == {
316                k.lower(): v for k, v in other.items()
317            }
318        return False
319
320    def __hash__(self) -> int:  # type: ignore [override]  # Doesn't match superclass (dict)
321        """Return the hash of the dictionary with keys sorted."""
322        items = [(k, v) for k, v in self.items() if not isinstance(v, dict)]
323        return hash(tuple(sorted(items)))

The StreamRecord class is a case-aware, case-insensitive dictionary implementation.

It has these behaviors:

  • When a key is retrieved, deleted, or checked for existence, it is always checked in a case-insensitive manner.
  • The original case is stored in a separate dictionary, so that the original case can be retrieved when needed.
  • Because it is subclassed from dict, the StreamRecord class can be passed as a normal Python dictionary.
  • In addition to the properties of the stream's records, the dictionary also stores the Airbyte metadata columns: _airbyte_raw_id, _airbyte_extracted_at, and _airbyte_meta.

This behavior mirrors how a case-aware, case-insensitive SQL database would handle column references.

There are two ways this class can store keys internally:

  • If normalize_keys is True, the keys are normalized using the given normalizer.
  • If normalize_keys is False, the original case of the keys is stored.

In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the dictionary will be initialized with the given keys. If a key is not found in the input data, it will be initialized with a value of None. When provided, the 'expected_keys' input will also determine the original case of the keys.

@classmethod
def from_record_message( cls, record_message: airbyte_protocol.models.airbyte_protocol.AirbyteRecordMessage, *, stream_record_handler: StreamRecordHandler) -> StreamRecord:
245    @classmethod
246    def from_record_message(
247        cls,
248        record_message: AirbyteRecordMessage,
249        *,
250        stream_record_handler: StreamRecordHandler,
251    ) -> StreamRecord:
252        """Return a StreamRecord from a RecordMessage."""
253        data_dict: dict[str, Any] = record_message.data.copy()
254        return cls(
255            from_dict=data_dict,
256            stream_record_handler=stream_record_handler,
257            with_internal_columns=True,
258            extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=pytz.utc),
259        )

Return a StreamRecord from a RecordMessage.

Inherited Members
builtins.dict
get
setdefault
pop
popitem
keys
items
values
update
fromkeys
clear
copy