airbyte.records

PyAirbyte Records module.

Understanding record handling in PyAirbyte

PyAirbyte models record handling after Airbyte's "Destination V2" ("Dv2") record handling. This includes the below implementation details.

Field Name Normalization

  1. PyAirbyte normalizes top-level record keys to lowercase, replacing spaces and hyphens with underscores.
  2. PyAirbyte does not normalize nested keys on sub-properties.

For example, the following record:

{

    "My-Field": "value",
    "Nested": {
        "MySubField": "value"
    }
}

Would be normalized to:

{
    "my_field": "value",
    "nested": {
        "MySubField": "value"
    }
}

Table Name Normalization

Similar to column handling, PyAirbyte normalizes table names to the lowercase version of the stream name and may remove or normalize special characters.

Airbyte-Managed Metadata Columns

PyAirbyte adds the following columns to every record:

  • ab_raw_id: A unique identifier for the record.
  • ab_extracted_at: The time the record was extracted.
  • ab_meta: A dictionary of extra metadata about the record.

The names of these columns are included in the airbyte.constants module for programmatic reference.

Schema Evolution

PyAirbyte supports a very basic form of schema evolution:

  1. Columns are always auto-added to cache tables whenever newly arriving properties are detected as not present in the cache table.
  2. Column types will not be modified or expanded to fit changed types in the source catalog.
    • If column types change, we recommend user to manually alter the column types.
  3. At any time, users can run a full sync with a WriteStrategy of 'replace'. This will create a fresh table from scratch and then swap the old and new tables after table sync is complete.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""PyAirbyte Records module.
  3
  4## Understanding record handling in PyAirbyte
  5
  6PyAirbyte models record handling after Airbyte's "Destination V2" ("Dv2") record handling. This
  7includes the below implementation details.
  8
  9### Field Name Normalization
 10
 111. PyAirbyte normalizes top-level record keys to lowercase, replacing spaces and hyphens with
 12   underscores.
 132. PyAirbyte does not normalize nested keys on sub-properties.
 14
 15For example, the following record:
 16
 17```json
 18{
 19
 20    "My-Field": "value",
 21    "Nested": {
 22        "MySubField": "value"
 23    }
 24}
 25```
 26
 27Would be normalized to:
 28
 29```json
 30{
 31    "my_field": "value",
 32    "nested": {
 33        "MySubField": "value"
 34    }
 35}
 36```
 37
 38### Table Name Normalization
 39
 40Similar to column handling, PyAirbyte normalizes table names to the lowercase version of the stream
 41name and may remove or normalize special characters.
 42
 43### Airbyte-Managed Metadata Columns
 44
 45PyAirbyte adds the following columns to every record:
 46
 47- `ab_raw_id`: A unique identifier for the record.
 48- `ab_extracted_at`: The time the record was extracted.
 49- `ab_meta`: A dictionary of extra metadata about the record.
 50
 51The names of these columns are included in the `airbyte.constants` module for programmatic
 52reference.
 53
 54## Schema Evolution
 55
 56PyAirbyte supports a very basic form of schema evolution:
 57
 581. Columns are always auto-added to cache tables whenever newly arriving properties are detected
 59   as not present in the cache table.
 602. Column types will not be modified or expanded to fit changed types in the source catalog.
 61   - If column types change, we recommend user to manually alter the column types.
 623. At any time, users can run a full sync with a `WriteStrategy` of 'replace'. This will create a
 63   fresh table from scratch and then swap the old and new tables after table sync is complete.
 64
 65---
 66
 67"""
 68
 69from __future__ import annotations
 70
 71from datetime import datetime, timezone
 72from typing import TYPE_CHECKING, Any
 73
 74from uuid_extensions import uuid7str
 75
 76from airbyte._util.name_normalizers import LowerCaseNormalizer, NameNormalizerBase
 77from airbyte.constants import (
 78    AB_EXTRACTED_AT_COLUMN,
 79    AB_INTERNAL_COLUMNS,
 80    AB_META_COLUMN,
 81    AB_RAW_ID_COLUMN,
 82)
 83
 84
 85if TYPE_CHECKING:
 86    from collections.abc import Iterator
 87
 88    from airbyte_protocol.models import (
 89        AirbyteRecordMessage,
 90    )
 91
 92
 93class StreamRecordHandler:
 94    """A class to handle processing of StreamRecords.
 95
 96    This is a long-lived object that can be used to process multiple StreamRecords, which
 97    saves on memory and processing time by reusing the same object for all records of the same type.
 98    """
 99
100    def __init__(
101        self,
102        *,
103        json_schema: dict,
104        normalizer: type[NameNormalizerBase] = LowerCaseNormalizer,
105        normalize_keys: bool = False,
106        prune_extra_fields: bool,
107    ) -> None:
108        """Initialize the dictionary with the given data.
109
110        Args:
111            json_schema: The JSON Schema definition for this stream.
112            normalizer: The normalizer to use when normalizing keys. If not provided, the
113                LowerCaseNormalizer will be used.
114            normalize_keys: If `True`, the keys will be normalized using the given normalizer.
115            prune_extra_fields: If `True`, unexpected fields will be removed.
116        """
117        self._expected_keys: list[str] = list(json_schema.get("properties", {}).keys())
118        self._normalizer: type[NameNormalizerBase] = normalizer
119        self._normalize_keys: bool = normalize_keys
120        self.prune_extra_fields: bool = prune_extra_fields
121
122        self.index_keys: list[str] = [
123            self._normalizer.normalize(key) if self._normalize_keys else key
124            for key in self._expected_keys
125        ]
126        self.normalized_keys: list[str] = [
127            self._normalizer.normalize(key) for key in self._expected_keys
128        ]
129        self.quick_lookup: dict[str, str]
130
131        for internal_col in AB_INTERNAL_COLUMNS:
132            if internal_col not in self._expected_keys:
133                self._expected_keys.append(internal_col)
134
135        # Store a lookup from normalized keys to pretty cased (originally cased) keys.
136        self._pretty_case_lookup: dict[str, str] = {
137            self._normalizer.normalize(pretty_case.lower()): pretty_case
138            for pretty_case in self._expected_keys
139        }
140        # Store a map from all key versions (normalized and pretty-cased) to their normalized
141        # version.
142        self.quick_lookup = {
143            key: self._normalizer.normalize(key)
144            if self._normalize_keys
145            else self.to_display_case(key)
146            for key in set(self._expected_keys) | set(self._pretty_case_lookup.values())
147        }
148
149    def to_display_case(self, key: str) -> str:
150        """Return the original case of the key.
151
152        If the key is not found in the pretty case lookup, return the key provided.
153        """
154        return self._pretty_case_lookup.get(self._normalizer.normalize(key), key)
155
156    def to_index_case(self, key: str) -> str:
157        """Return the internal case of the key.
158
159        If `normalize_keys` is True, returns the normalized key.
160        Otherwise, return the original case ("pretty case") of the key.
161        """
162        try:
163            return self.quick_lookup[key]
164        except KeyError:
165            result = (
166                self._normalizer.normalize(key)
167                if self._normalize_keys
168                else self.to_display_case(key)
169            )
170            self.quick_lookup[key] = result
171            return result
172
173
174class StreamRecord(dict[str, Any]):
175    """The StreamRecord class is a case-aware, case-insensitive dictionary implementation.
176
177    It has these behaviors:
178    - When a key is retrieved, deleted, or checked for existence, it is always checked in a
179      case-insensitive manner.
180    - The original case is stored in a separate dictionary, so that the original case can be
181      retrieved when needed.
182    - Because it is subclassed from `dict`, the `StreamRecord` class can be passed as a normal
183      Python dictionary.
184    - In addition to the properties of the stream's records, the dictionary also stores the Airbyte
185      metadata columns: `_airbyte_raw_id`, `_airbyte_extracted_at`, and `_airbyte_meta`.
186
187    This behavior mirrors how a case-aware, case-insensitive SQL database would handle column
188    references.
189
190    There are two ways this class can store keys internally:
191    - If normalize_keys is True, the keys are normalized using the given normalizer.
192    - If normalize_keys is False, the original case of the keys is stored.
193
194    In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the
195    dictionary will be initialized with the given keys. If a key is not found in the input data, it
196    will be initialized with a value of None. When provided, the 'expected_keys' input will also
197    determine the original case of the keys.
198    """
199
200    def __init__(
201        self,
202        from_dict: dict,
203        *,
204        stream_record_handler: StreamRecordHandler,
205        with_internal_columns: bool = True,
206        extracted_at: datetime | None = None,
207    ) -> None:
208        """Initialize the dictionary with the given data.
209
210        Args:
211            from_dict: The dictionary to initialize the StreamRecord with.
212            stream_record_handler: The StreamRecordHandler to use for processing the record.
213            with_internal_columns: If `True`, the internal columns will be added to the record.
214            extracted_at: The time the record was extracted. If not provided, the current time will
215                be used.
216        """
217        self._stream_handler: StreamRecordHandler = stream_record_handler
218
219        # Start by initializing all values to None
220        self.update(dict.fromkeys(stream_record_handler.index_keys))
221
222        # Update the dictionary with the given data
223        if self._stream_handler.prune_extra_fields:
224            self.update(
225                {
226                    self._stream_handler.to_index_case(k): v
227                    for k, v in from_dict.items()
228                    if self._stream_handler.to_index_case(k) in self._stream_handler.index_keys
229                }
230            )
231        else:
232            self.update({self._stream_handler.to_index_case(k): v for k, v in from_dict.items()})
233
234        if with_internal_columns:
235            self.update(
236                {
237                    AB_RAW_ID_COLUMN: uuid7str(),
238                    AB_EXTRACTED_AT_COLUMN: extracted_at or datetime.now(timezone.utc),
239                    AB_META_COLUMN: {},
240                }
241            )
242
243    @classmethod
244    def from_record_message(
245        cls,
246        record_message: AirbyteRecordMessage,
247        *,
248        stream_record_handler: StreamRecordHandler,
249    ) -> StreamRecord:
250        """Return a StreamRecord from a RecordMessage."""
251        data_dict: dict[str, Any] = record_message.data.copy()
252        return cls(
253            from_dict=data_dict,
254            stream_record_handler=stream_record_handler,
255            with_internal_columns=True,
256            extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=timezone.utc),
257        )
258
259    def __getitem__(self, key: str) -> Any:  # noqa: ANN401
260        """Return the item with the given key."""
261        try:
262            return super().__getitem__(key)
263        except KeyError:
264            return super().__getitem__(self._stream_handler.to_index_case(key))
265
266    def __setitem__(self, key: str, value: Any) -> None:  # noqa: ANN401
267        """Set the item with the given key to the given value."""
268        index_case_key = self._stream_handler.to_index_case(key)
269        if (
270            self._stream_handler.prune_extra_fields
271            and index_case_key not in self._stream_handler.index_keys
272        ):
273            return
274
275        super().__setitem__(index_case_key, value)
276
277    def __delitem__(self, key: str) -> None:
278        """Delete the item with the given key."""
279        try:
280            super().__delitem__(key)
281        except KeyError:
282            index_case_key = self._stream_handler.to_index_case(key)
283            if super().__contains__(index_case_key):
284                super().__delitem__(index_case_key)
285                return
286        else:
287            # No failure. Key was deleted.
288            return
289
290        raise KeyError(key)
291
292    def __contains__(self, key: object) -> bool:
293        """Return whether the dictionary contains the given key."""
294        assert isinstance(key, str), "Key must be a string."
295        return super().__contains__(key) or super().__contains__(
296            self._stream_handler.to_index_case(key)
297        )
298
299    def __iter__(self) -> Iterator[str]:
300        """Return an iterator over the keys of the dictionary."""
301        return iter(super().__iter__())
302
303    def __len__(self) -> int:
304        """Return the number of items in the dictionary."""
305        return super().__len__()
306
307    def __eq__(self, other: object) -> bool:
308        """Return whether the StreamRecord is equal to the given dict or StreamRecord object."""
309        if isinstance(other, StreamRecord):
310            return dict(self) == dict(other)
311
312        if isinstance(other, dict):
313            return {k.lower(): v for k, v in self.items()} == {
314                k.lower(): v for k, v in other.items()
315            }
316        return False
317
318    def __hash__(self) -> int:  # type: ignore [override]  # Doesn't match superclass (dict)
319        """Return the hash of the dictionary with keys sorted."""
320        items = [(k, v) for k, v in self.items() if not isinstance(v, dict)]
321        return hash(tuple(sorted(items)))
class StreamRecordHandler:
 94class StreamRecordHandler:
 95    """A class to handle processing of StreamRecords.
 96
 97    This is a long-lived object that can be used to process multiple StreamRecords, which
 98    saves on memory and processing time by reusing the same object for all records of the same type.
 99    """
100
101    def __init__(
102        self,
103        *,
104        json_schema: dict,
105        normalizer: type[NameNormalizerBase] = LowerCaseNormalizer,
106        normalize_keys: bool = False,
107        prune_extra_fields: bool,
108    ) -> None:
109        """Initialize the dictionary with the given data.
110
111        Args:
112            json_schema: The JSON Schema definition for this stream.
113            normalizer: The normalizer to use when normalizing keys. If not provided, the
114                LowerCaseNormalizer will be used.
115            normalize_keys: If `True`, the keys will be normalized using the given normalizer.
116            prune_extra_fields: If `True`, unexpected fields will be removed.
117        """
118        self._expected_keys: list[str] = list(json_schema.get("properties", {}).keys())
119        self._normalizer: type[NameNormalizerBase] = normalizer
120        self._normalize_keys: bool = normalize_keys
121        self.prune_extra_fields: bool = prune_extra_fields
122
123        self.index_keys: list[str] = [
124            self._normalizer.normalize(key) if self._normalize_keys else key
125            for key in self._expected_keys
126        ]
127        self.normalized_keys: list[str] = [
128            self._normalizer.normalize(key) for key in self._expected_keys
129        ]
130        self.quick_lookup: dict[str, str]
131
132        for internal_col in AB_INTERNAL_COLUMNS:
133            if internal_col not in self._expected_keys:
134                self._expected_keys.append(internal_col)
135
136        # Store a lookup from normalized keys to pretty cased (originally cased) keys.
137        self._pretty_case_lookup: dict[str, str] = {
138            self._normalizer.normalize(pretty_case.lower()): pretty_case
139            for pretty_case in self._expected_keys
140        }
141        # Store a map from all key versions (normalized and pretty-cased) to their normalized
142        # version.
143        self.quick_lookup = {
144            key: self._normalizer.normalize(key)
145            if self._normalize_keys
146            else self.to_display_case(key)
147            for key in set(self._expected_keys) | set(self._pretty_case_lookup.values())
148        }
149
150    def to_display_case(self, key: str) -> str:
151        """Return the original case of the key.
152
153        If the key is not found in the pretty case lookup, return the key provided.
154        """
155        return self._pretty_case_lookup.get(self._normalizer.normalize(key), key)
156
157    def to_index_case(self, key: str) -> str:
158        """Return the internal case of the key.
159
160        If `normalize_keys` is True, returns the normalized key.
161        Otherwise, return the original case ("pretty case") of the key.
162        """
163        try:
164            return self.quick_lookup[key]
165        except KeyError:
166            result = (
167                self._normalizer.normalize(key)
168                if self._normalize_keys
169                else self.to_display_case(key)
170            )
171            self.quick_lookup[key] = result
172            return result

A class to handle processing of StreamRecords.

This is a long-lived object that can be used to process multiple StreamRecords, which saves on memory and processing time by reusing the same object for all records of the same type.

StreamRecordHandler( *, json_schema: dict, normalizer: type[airbyte._util.name_normalizers.NameNormalizerBase] = <class 'airbyte._util.name_normalizers.LowerCaseNormalizer'>, normalize_keys: bool = False, prune_extra_fields: bool)
101    def __init__(
102        self,
103        *,
104        json_schema: dict,
105        normalizer: type[NameNormalizerBase] = LowerCaseNormalizer,
106        normalize_keys: bool = False,
107        prune_extra_fields: bool,
108    ) -> None:
109        """Initialize the dictionary with the given data.
110
111        Args:
112            json_schema: The JSON Schema definition for this stream.
113            normalizer: The normalizer to use when normalizing keys. If not provided, the
114                LowerCaseNormalizer will be used.
115            normalize_keys: If `True`, the keys will be normalized using the given normalizer.
116            prune_extra_fields: If `True`, unexpected fields will be removed.
117        """
118        self._expected_keys: list[str] = list(json_schema.get("properties", {}).keys())
119        self._normalizer: type[NameNormalizerBase] = normalizer
120        self._normalize_keys: bool = normalize_keys
121        self.prune_extra_fields: bool = prune_extra_fields
122
123        self.index_keys: list[str] = [
124            self._normalizer.normalize(key) if self._normalize_keys else key
125            for key in self._expected_keys
126        ]
127        self.normalized_keys: list[str] = [
128            self._normalizer.normalize(key) for key in self._expected_keys
129        ]
130        self.quick_lookup: dict[str, str]
131
132        for internal_col in AB_INTERNAL_COLUMNS:
133            if internal_col not in self._expected_keys:
134                self._expected_keys.append(internal_col)
135
136        # Store a lookup from normalized keys to pretty cased (originally cased) keys.
137        self._pretty_case_lookup: dict[str, str] = {
138            self._normalizer.normalize(pretty_case.lower()): pretty_case
139            for pretty_case in self._expected_keys
140        }
141        # Store a map from all key versions (normalized and pretty-cased) to their normalized
142        # version.
143        self.quick_lookup = {
144            key: self._normalizer.normalize(key)
145            if self._normalize_keys
146            else self.to_display_case(key)
147            for key in set(self._expected_keys) | set(self._pretty_case_lookup.values())
148        }

Initialize the dictionary with the given data.

Arguments:
  • json_schema: The JSON Schema definition for this stream.
  • normalizer: The normalizer to use when normalizing keys. If not provided, the LowerCaseNormalizer will be used.
  • normalize_keys: If True, the keys will be normalized using the given normalizer.
  • prune_extra_fields: If True, unexpected fields will be removed.
prune_extra_fields: bool
index_keys: list[str]
normalized_keys: list[str]
quick_lookup: dict[str, str]
def to_display_case(self, key: str) -> str:
150    def to_display_case(self, key: str) -> str:
151        """Return the original case of the key.
152
153        If the key is not found in the pretty case lookup, return the key provided.
154        """
155        return self._pretty_case_lookup.get(self._normalizer.normalize(key), key)

Return the original case of the key.

If the key is not found in the pretty case lookup, return the key provided.

def to_index_case(self, key: str) -> str:
157    def to_index_case(self, key: str) -> str:
158        """Return the internal case of the key.
159
160        If `normalize_keys` is True, returns the normalized key.
161        Otherwise, return the original case ("pretty case") of the key.
162        """
163        try:
164            return self.quick_lookup[key]
165        except KeyError:
166            result = (
167                self._normalizer.normalize(key)
168                if self._normalize_keys
169                else self.to_display_case(key)
170            )
171            self.quick_lookup[key] = result
172            return result

Return the internal case of the key.

If normalize_keys is True, returns the normalized key. Otherwise, return the original case ("pretty case") of the key.

class StreamRecord(dict[str, typing.Any]):
175class StreamRecord(dict[str, Any]):
176    """The StreamRecord class is a case-aware, case-insensitive dictionary implementation.
177
178    It has these behaviors:
179    - When a key is retrieved, deleted, or checked for existence, it is always checked in a
180      case-insensitive manner.
181    - The original case is stored in a separate dictionary, so that the original case can be
182      retrieved when needed.
183    - Because it is subclassed from `dict`, the `StreamRecord` class can be passed as a normal
184      Python dictionary.
185    - In addition to the properties of the stream's records, the dictionary also stores the Airbyte
186      metadata columns: `_airbyte_raw_id`, `_airbyte_extracted_at`, and `_airbyte_meta`.
187
188    This behavior mirrors how a case-aware, case-insensitive SQL database would handle column
189    references.
190
191    There are two ways this class can store keys internally:
192    - If normalize_keys is True, the keys are normalized using the given normalizer.
193    - If normalize_keys is False, the original case of the keys is stored.
194
195    In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the
196    dictionary will be initialized with the given keys. If a key is not found in the input data, it
197    will be initialized with a value of None. When provided, the 'expected_keys' input will also
198    determine the original case of the keys.
199    """
200
201    def __init__(
202        self,
203        from_dict: dict,
204        *,
205        stream_record_handler: StreamRecordHandler,
206        with_internal_columns: bool = True,
207        extracted_at: datetime | None = None,
208    ) -> None:
209        """Initialize the dictionary with the given data.
210
211        Args:
212            from_dict: The dictionary to initialize the StreamRecord with.
213            stream_record_handler: The StreamRecordHandler to use for processing the record.
214            with_internal_columns: If `True`, the internal columns will be added to the record.
215            extracted_at: The time the record was extracted. If not provided, the current time will
216                be used.
217        """
218        self._stream_handler: StreamRecordHandler = stream_record_handler
219
220        # Start by initializing all values to None
221        self.update(dict.fromkeys(stream_record_handler.index_keys))
222
223        # Update the dictionary with the given data
224        if self._stream_handler.prune_extra_fields:
225            self.update(
226                {
227                    self._stream_handler.to_index_case(k): v
228                    for k, v in from_dict.items()
229                    if self._stream_handler.to_index_case(k) in self._stream_handler.index_keys
230                }
231            )
232        else:
233            self.update({self._stream_handler.to_index_case(k): v for k, v in from_dict.items()})
234
235        if with_internal_columns:
236            self.update(
237                {
238                    AB_RAW_ID_COLUMN: uuid7str(),
239                    AB_EXTRACTED_AT_COLUMN: extracted_at or datetime.now(timezone.utc),
240                    AB_META_COLUMN: {},
241                }
242            )
243
244    @classmethod
245    def from_record_message(
246        cls,
247        record_message: AirbyteRecordMessage,
248        *,
249        stream_record_handler: StreamRecordHandler,
250    ) -> StreamRecord:
251        """Return a StreamRecord from a RecordMessage."""
252        data_dict: dict[str, Any] = record_message.data.copy()
253        return cls(
254            from_dict=data_dict,
255            stream_record_handler=stream_record_handler,
256            with_internal_columns=True,
257            extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=timezone.utc),
258        )
259
260    def __getitem__(self, key: str) -> Any:  # noqa: ANN401
261        """Return the item with the given key."""
262        try:
263            return super().__getitem__(key)
264        except KeyError:
265            return super().__getitem__(self._stream_handler.to_index_case(key))
266
267    def __setitem__(self, key: str, value: Any) -> None:  # noqa: ANN401
268        """Set the item with the given key to the given value."""
269        index_case_key = self._stream_handler.to_index_case(key)
270        if (
271            self._stream_handler.prune_extra_fields
272            and index_case_key not in self._stream_handler.index_keys
273        ):
274            return
275
276        super().__setitem__(index_case_key, value)
277
278    def __delitem__(self, key: str) -> None:
279        """Delete the item with the given key."""
280        try:
281            super().__delitem__(key)
282        except KeyError:
283            index_case_key = self._stream_handler.to_index_case(key)
284            if super().__contains__(index_case_key):
285                super().__delitem__(index_case_key)
286                return
287        else:
288            # No failure. Key was deleted.
289            return
290
291        raise KeyError(key)
292
293    def __contains__(self, key: object) -> bool:
294        """Return whether the dictionary contains the given key."""
295        assert isinstance(key, str), "Key must be a string."
296        return super().__contains__(key) or super().__contains__(
297            self._stream_handler.to_index_case(key)
298        )
299
300    def __iter__(self) -> Iterator[str]:
301        """Return an iterator over the keys of the dictionary."""
302        return iter(super().__iter__())
303
304    def __len__(self) -> int:
305        """Return the number of items in the dictionary."""
306        return super().__len__()
307
308    def __eq__(self, other: object) -> bool:
309        """Return whether the StreamRecord is equal to the given dict or StreamRecord object."""
310        if isinstance(other, StreamRecord):
311            return dict(self) == dict(other)
312
313        if isinstance(other, dict):
314            return {k.lower(): v for k, v in self.items()} == {
315                k.lower(): v for k, v in other.items()
316            }
317        return False
318
319    def __hash__(self) -> int:  # type: ignore [override]  # Doesn't match superclass (dict)
320        """Return the hash of the dictionary with keys sorted."""
321        items = [(k, v) for k, v in self.items() if not isinstance(v, dict)]
322        return hash(tuple(sorted(items)))

The StreamRecord class is a case-aware, case-insensitive dictionary implementation.

It has these behaviors:

  • When a key is retrieved, deleted, or checked for existence, it is always checked in a case-insensitive manner.
  • The original case is stored in a separate dictionary, so that the original case can be retrieved when needed.
  • Because it is subclassed from dict, the StreamRecord class can be passed as a normal Python dictionary.
  • In addition to the properties of the stream's records, the dictionary also stores the Airbyte metadata columns: _airbyte_raw_id, _airbyte_extracted_at, and _airbyte_meta.

This behavior mirrors how a case-aware, case-insensitive SQL database would handle column references.

There are two ways this class can store keys internally:

  • If normalize_keys is True, the keys are normalized using the given normalizer.
  • If normalize_keys is False, the original case of the keys is stored.

In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the dictionary will be initialized with the given keys. If a key is not found in the input data, it will be initialized with a value of None. When provided, the 'expected_keys' input will also determine the original case of the keys.

@classmethod
def from_record_message( cls, record_message: airbyte_protocol.models.airbyte_protocol.AirbyteRecordMessage, *, stream_record_handler: StreamRecordHandler) -> StreamRecord:
244    @classmethod
245    def from_record_message(
246        cls,
247        record_message: AirbyteRecordMessage,
248        *,
249        stream_record_handler: StreamRecordHandler,
250    ) -> StreamRecord:
251        """Return a StreamRecord from a RecordMessage."""
252        data_dict: dict[str, Any] = record_message.data.copy()
253        return cls(
254            from_dict=data_dict,
255            stream_record_handler=stream_record_handler,
256            with_internal_columns=True,
257            extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=timezone.utc),
258        )

Return a StreamRecord from a RecordMessage.