airbyte.records

PyAirbyte Records module.

Understanding record handling in PyAirbyte

PyAirbyte models record handling after Airbyte's "Destination V2" ("Dv2") record handling. This includes the below implementation details.

Field Name Normalization

  1. PyAirbyte normalizes top-level record keys to lowercase, replacing spaces and hyphens with underscores.
  2. PyAirbyte does not normalize nested keys on sub-properties.

For example, the following record:

{

    "My-Field": "value",
    "Nested": {
        "MySubField": "value"
    }
}

Would be normalized to:

{
    "my_field": "value",
    "nested": {
        "MySubField": "value"
    }
}

Table Name Normalization

Similar to column handling, PyAirbyte normalizes table names to the lowercase version of the stream name and may remove or normalize special characters.

Airbyte-Managed Metadata Columns

PyAirbyte adds the following columns to every record:

  • ab_raw_id: A unique identifier for the record.
  • ab_extracted_at: The time the record was extracted.
  • ab_meta: A dictionary of extra metadata about the record.

The names of these columns are included in the airbyte.constants module for programmatic reference.

Schema Evolution

PyAirbyte supports a very basic form of schema evolution:

  1. Columns are always auto-added to cache tables whenever newly arriving properties are detected as not present in the cache table.
  2. Column types will not be modified or expanded to fit changed types in the source catalog.
    • If column types change, we recommend user to manually alter the column types.
  3. At any time, users can run a full sync with a WriteStrategy of 'replace'. This will create a fresh table from scratch and then swap the old and new tables after table sync is complete.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""PyAirbyte Records module.
  3
  4## Understanding record handling in PyAirbyte
  5
  6PyAirbyte models record handling after Airbyte's "Destination V2" ("Dv2") record handling. This
  7includes the below implementation details.
  8
  9### Field Name Normalization
 10
 111. PyAirbyte normalizes top-level record keys to lowercase, replacing spaces and hyphens with
 12   underscores.
 132. PyAirbyte does not normalize nested keys on sub-properties.
 14
 15For example, the following record:
 16
 17```json
 18{
 19
 20    "My-Field": "value",
 21    "Nested": {
 22        "MySubField": "value"
 23    }
 24}
 25```
 26
 27Would be normalized to:
 28
 29```json
 30{
 31    "my_field": "value",
 32    "nested": {
 33        "MySubField": "value"
 34    }
 35}
 36```
 37
 38### Table Name Normalization
 39
 40Similar to column handling, PyAirbyte normalizes table names to the lowercase version of the stream
 41name and may remove or normalize special characters.
 42
 43### Airbyte-Managed Metadata Columns
 44
 45PyAirbyte adds the following columns to every record:
 46
 47- `ab_raw_id`: A unique identifier for the record.
 48- `ab_extracted_at`: The time the record was extracted.
 49- `ab_meta`: A dictionary of extra metadata about the record.
 50
 51The names of these columns are included in the `airbyte.constants` module for programmatic
 52reference.
 53
 54## Schema Evolution
 55
 56PyAirbyte supports a very basic form of schema evolution:
 57
 581. Columns are always auto-added to cache tables whenever newly arriving properties are detected
 59   as not present in the cache table.
 602. Column types will not be modified or expanded to fit changed types in the source catalog.
 61   - If column types change, we recommend user to manually alter the column types.
 623. At any time, users can run a full sync with a `WriteStrategy` of 'replace'. This will create a
 63   fresh table from scratch and then swap the old and new tables after table sync is complete.
 64
 65---
 66
 67"""
 68
 69from __future__ import annotations
 70
 71from datetime import datetime
 72from typing import TYPE_CHECKING, Any
 73
 74import pytz
 75from uuid_extensions import uuid7str
 76
 77from airbyte._util.name_normalizers import LowerCaseNormalizer, NameNormalizerBase
 78from airbyte.constants import (
 79    AB_EXTRACTED_AT_COLUMN,
 80    AB_INTERNAL_COLUMNS,
 81    AB_META_COLUMN,
 82    AB_RAW_ID_COLUMN,
 83)
 84
 85
 86if TYPE_CHECKING:
 87    from collections.abc import Iterator
 88
 89    from airbyte_protocol.models import (
 90        AirbyteRecordMessage,
 91    )
 92
 93
 94class StreamRecordHandler:
 95    """A class to handle processing of StreamRecords.
 96
 97    This is a long-lived object that can be used to process multiple StreamRecords, which
 98    saves on memory and processing time by reusing the same object for all records of the same type.
 99    """
100
101    def __init__(
102        self,
103        *,
104        json_schema: dict,
105        normalizer: type[NameNormalizerBase] = LowerCaseNormalizer,
106        normalize_keys: bool = False,
107        prune_extra_fields: bool,
108    ) -> None:
109        """Initialize the dictionary with the given data.
110
111        Args:
112            json_schema: The JSON Schema definition for this stream.
113            normalizer: The normalizer to use when normalizing keys. If not provided, the
114                LowerCaseNormalizer will be used.
115            normalize_keys: If `True`, the keys will be normalized using the given normalizer.
116            prune_extra_fields: If `True`, unexpected fields will be removed.
117        """
118        self._expected_keys: list[str] = list(json_schema.get("properties", {}).keys())
119        self._normalizer: type[NameNormalizerBase] = normalizer
120        self._normalize_keys: bool = normalize_keys
121        self.prune_extra_fields: bool = prune_extra_fields
122
123        self.index_keys: list[str] = [
124            self._normalizer.normalize(key) if self._normalize_keys else key
125            for key in self._expected_keys
126        ]
127        self.normalized_keys: list[str] = [
128            self._normalizer.normalize(key) for key in self._expected_keys
129        ]
130        self.quick_lookup: dict[str, str]
131
132        for internal_col in AB_INTERNAL_COLUMNS:
133            if internal_col not in self._expected_keys:
134                self._expected_keys.append(internal_col)
135
136        # Store a lookup from normalized keys to pretty cased (originally cased) keys.
137        self._pretty_case_lookup: dict[str, str] = {
138            self._normalizer.normalize(pretty_case.lower()): pretty_case
139            for pretty_case in self._expected_keys
140        }
141        # Store a map from all key versions (normalized and pretty-cased) to their normalized
142        # version.
143        self.quick_lookup = {
144            key: self._normalizer.normalize(key)
145            if self._normalize_keys
146            else self.to_display_case(key)
147            for key in set(self._expected_keys) | set(self._pretty_case_lookup.values())
148        }
149
150    def to_display_case(self, key: str) -> str:
151        """Return the original case of the key."""
152        return self._pretty_case_lookup[self._normalizer.normalize(key)]
153
154    def to_index_case(self, key: str) -> str:
155        """Return the internal case of the key.
156
157        If `normalize_keys` is True, returns the normalized key.
158        Otherwise, return the original case ("pretty case") of the key.
159        """
160        try:
161            return self.quick_lookup[key]
162        except KeyError:
163            result = (
164                self._normalizer.normalize(key)
165                if self._normalize_keys
166                else self.to_display_case(key)
167            )
168            self.quick_lookup[key] = result
169            return result
170
171
172class StreamRecord(dict[str, Any]):
173    """The StreamRecord class is a case-aware, case-insensitive dictionary implementation.
174
175    It has these behaviors:
176    - When a key is retrieved, deleted, or checked for existence, it is always checked in a
177      case-insensitive manner.
178    - The original case is stored in a separate dictionary, so that the original case can be
179      retrieved when needed.
180    - Because it is subclassed from `dict`, the `StreamRecord` class can be passed as a normal
181      Python dictionary.
182    - In addition to the properties of the stream's records, the dictionary also stores the Airbyte
183      metadata columns: `_airbyte_raw_id`, `_airbyte_extracted_at`, and `_airbyte_meta`.
184
185    This behavior mirrors how a case-aware, case-insensitive SQL database would handle column
186    references.
187
188    There are two ways this class can store keys internally:
189    - If normalize_keys is True, the keys are normalized using the given normalizer.
190    - If normalize_keys is False, the original case of the keys is stored.
191
192    In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the
193    dictionary will be initialized with the given keys. If a key is not found in the input data, it
194    will be initialized with a value of None. When provided, the 'expected_keys' input will also
195    determine the original case of the keys.
196    """
197
198    def __init__(
199        self,
200        from_dict: dict,
201        *,
202        stream_record_handler: StreamRecordHandler,
203        with_internal_columns: bool = True,
204        extracted_at: datetime | None = None,
205    ) -> None:
206        """Initialize the dictionary with the given data.
207
208        Args:
209            from_dict: The dictionary to initialize the StreamRecord with.
210            stream_record_handler: The StreamRecordHandler to use for processing the record.
211            with_internal_columns: If `True`, the internal columns will be added to the record.
212            extracted_at: The time the record was extracted. If not provided, the current time will
213                be used.
214        """
215        self._stream_handler: StreamRecordHandler = stream_record_handler
216
217        # Start by initializing all values to None
218        self.update(dict.fromkeys(stream_record_handler.index_keys))
219
220        # Update the dictionary with the given data
221        if self._stream_handler.prune_extra_fields:
222            self.update(
223                {
224                    self._stream_handler.to_index_case(k): v
225                    for k, v in from_dict.items()
226                    if self._stream_handler.to_index_case(k) in self._stream_handler.index_keys
227                }
228            )
229        else:
230            self.update({self._stream_handler.to_index_case(k): v for k, v in from_dict.items()})
231
232        if with_internal_columns:
233            self.update(
234                {
235                    AB_RAW_ID_COLUMN: uuid7str(),
236                    AB_EXTRACTED_AT_COLUMN: extracted_at or datetime.now(pytz.utc),
237                    AB_META_COLUMN: {},
238                }
239            )
240
241    @classmethod
242    def from_record_message(
243        cls,
244        record_message: AirbyteRecordMessage,
245        *,
246        stream_record_handler: StreamRecordHandler,
247    ) -> StreamRecord:
248        """Return a StreamRecord from a RecordMessage."""
249        data_dict: dict[str, Any] = record_message.data.copy()
250        return cls(
251            from_dict=data_dict,
252            stream_record_handler=stream_record_handler,
253            with_internal_columns=True,
254            extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=pytz.utc),
255        )
256
257    def __getitem__(self, key: str) -> Any:  # noqa: ANN401
258        """Return the item with the given key."""
259        try:
260            return super().__getitem__(key)
261        except KeyError:
262            return super().__getitem__(self._stream_handler.to_index_case(key))
263
264    def __setitem__(self, key: str, value: Any) -> None:  # noqa: ANN401
265        """Set the item with the given key to the given value."""
266        index_case_key = self._stream_handler.to_index_case(key)
267        if (
268            self._stream_handler.prune_extra_fields
269            and index_case_key not in self._stream_handler.index_keys
270        ):
271            return
272
273        super().__setitem__(index_case_key, value)
274
275    def __delitem__(self, key: str) -> None:
276        """Delete the item with the given key."""
277        try:
278            super().__delitem__(key)
279        except KeyError:
280            index_case_key = self._stream_handler.to_index_case(key)
281            if super().__contains__(index_case_key):
282                super().__delitem__(index_case_key)
283                return
284        else:
285            # No failure. Key was deleted.
286            return
287
288        raise KeyError(key)
289
290    def __contains__(self, key: object) -> bool:
291        """Return whether the dictionary contains the given key."""
292        assert isinstance(key, str), "Key must be a string."
293        return super().__contains__(key) or super().__contains__(
294            self._stream_handler.to_index_case(key)
295        )
296
297    def __iter__(self) -> Iterator[str]:
298        """Return an iterator over the keys of the dictionary."""
299        return iter(super().__iter__())
300
301    def __len__(self) -> int:
302        """Return the number of items in the dictionary."""
303        return super().__len__()
304
305    def __eq__(self, other: object) -> bool:
306        """Return whether the StreamRecord is equal to the given dict or StreamRecord object."""
307        if isinstance(other, StreamRecord):
308            return dict(self) == dict(other)
309
310        if isinstance(other, dict):
311            return {k.lower(): v for k, v in self.items()} == {
312                k.lower(): v for k, v in other.items()
313            }
314        return False
315
316    def __hash__(self) -> int:  # type: ignore [override]  # Doesn't match superclass (dict)
317        """Return the hash of the dictionary with keys sorted."""
318        items = [(k, v) for k, v in self.items() if not isinstance(v, dict)]
319        return hash(tuple(sorted(items)))
class StreamRecordHandler:
 95class StreamRecordHandler:
 96    """A class to handle processing of StreamRecords.
 97
 98    This is a long-lived object that can be used to process multiple StreamRecords, which
 99    saves on memory and processing time by reusing the same object for all records of the same type.
100    """
101
102    def __init__(
103        self,
104        *,
105        json_schema: dict,
106        normalizer: type[NameNormalizerBase] = LowerCaseNormalizer,
107        normalize_keys: bool = False,
108        prune_extra_fields: bool,
109    ) -> None:
110        """Initialize the dictionary with the given data.
111
112        Args:
113            json_schema: The JSON Schema definition for this stream.
114            normalizer: The normalizer to use when normalizing keys. If not provided, the
115                LowerCaseNormalizer will be used.
116            normalize_keys: If `True`, the keys will be normalized using the given normalizer.
117            prune_extra_fields: If `True`, unexpected fields will be removed.
118        """
119        self._expected_keys: list[str] = list(json_schema.get("properties", {}).keys())
120        self._normalizer: type[NameNormalizerBase] = normalizer
121        self._normalize_keys: bool = normalize_keys
122        self.prune_extra_fields: bool = prune_extra_fields
123
124        self.index_keys: list[str] = [
125            self._normalizer.normalize(key) if self._normalize_keys else key
126            for key in self._expected_keys
127        ]
128        self.normalized_keys: list[str] = [
129            self._normalizer.normalize(key) for key in self._expected_keys
130        ]
131        self.quick_lookup: dict[str, str]
132
133        for internal_col in AB_INTERNAL_COLUMNS:
134            if internal_col not in self._expected_keys:
135                self._expected_keys.append(internal_col)
136
137        # Store a lookup from normalized keys to pretty cased (originally cased) keys.
138        self._pretty_case_lookup: dict[str, str] = {
139            self._normalizer.normalize(pretty_case.lower()): pretty_case
140            for pretty_case in self._expected_keys
141        }
142        # Store a map from all key versions (normalized and pretty-cased) to their normalized
143        # version.
144        self.quick_lookup = {
145            key: self._normalizer.normalize(key)
146            if self._normalize_keys
147            else self.to_display_case(key)
148            for key in set(self._expected_keys) | set(self._pretty_case_lookup.values())
149        }
150
151    def to_display_case(self, key: str) -> str:
152        """Return the original case of the key."""
153        return self._pretty_case_lookup[self._normalizer.normalize(key)]
154
155    def to_index_case(self, key: str) -> str:
156        """Return the internal case of the key.
157
158        If `normalize_keys` is True, returns the normalized key.
159        Otherwise, return the original case ("pretty case") of the key.
160        """
161        try:
162            return self.quick_lookup[key]
163        except KeyError:
164            result = (
165                self._normalizer.normalize(key)
166                if self._normalize_keys
167                else self.to_display_case(key)
168            )
169            self.quick_lookup[key] = result
170            return result

A class to handle processing of StreamRecords.

This is a long-lived object that can be used to process multiple StreamRecords, which saves on memory and processing time by reusing the same object for all records of the same type.

StreamRecordHandler( *, json_schema: dict, normalizer: type[airbyte._util.name_normalizers.NameNormalizerBase] = <class 'airbyte._util.name_normalizers.LowerCaseNormalizer'>, normalize_keys: bool = False, prune_extra_fields: bool)
102    def __init__(
103        self,
104        *,
105        json_schema: dict,
106        normalizer: type[NameNormalizerBase] = LowerCaseNormalizer,
107        normalize_keys: bool = False,
108        prune_extra_fields: bool,
109    ) -> None:
110        """Initialize the dictionary with the given data.
111
112        Args:
113            json_schema: The JSON Schema definition for this stream.
114            normalizer: The normalizer to use when normalizing keys. If not provided, the
115                LowerCaseNormalizer will be used.
116            normalize_keys: If `True`, the keys will be normalized using the given normalizer.
117            prune_extra_fields: If `True`, unexpected fields will be removed.
118        """
119        self._expected_keys: list[str] = list(json_schema.get("properties", {}).keys())
120        self._normalizer: type[NameNormalizerBase] = normalizer
121        self._normalize_keys: bool = normalize_keys
122        self.prune_extra_fields: bool = prune_extra_fields
123
124        self.index_keys: list[str] = [
125            self._normalizer.normalize(key) if self._normalize_keys else key
126            for key in self._expected_keys
127        ]
128        self.normalized_keys: list[str] = [
129            self._normalizer.normalize(key) for key in self._expected_keys
130        ]
131        self.quick_lookup: dict[str, str]
132
133        for internal_col in AB_INTERNAL_COLUMNS:
134            if internal_col not in self._expected_keys:
135                self._expected_keys.append(internal_col)
136
137        # Store a lookup from normalized keys to pretty cased (originally cased) keys.
138        self._pretty_case_lookup: dict[str, str] = {
139            self._normalizer.normalize(pretty_case.lower()): pretty_case
140            for pretty_case in self._expected_keys
141        }
142        # Store a map from all key versions (normalized and pretty-cased) to their normalized
143        # version.
144        self.quick_lookup = {
145            key: self._normalizer.normalize(key)
146            if self._normalize_keys
147            else self.to_display_case(key)
148            for key in set(self._expected_keys) | set(self._pretty_case_lookup.values())
149        }

Initialize the dictionary with the given data.

Arguments:
  • json_schema: The JSON Schema definition for this stream.
  • normalizer: The normalizer to use when normalizing keys. If not provided, the LowerCaseNormalizer will be used.
  • normalize_keys: If True, the keys will be normalized using the given normalizer.
  • prune_extra_fields: If True, unexpected fields will be removed.
prune_extra_fields: bool
index_keys: list[str]
normalized_keys: list[str]
quick_lookup: dict[str, str]
def to_display_case(self, key: str) -> str:
151    def to_display_case(self, key: str) -> str:
152        """Return the original case of the key."""
153        return self._pretty_case_lookup[self._normalizer.normalize(key)]

Return the original case of the key.

def to_index_case(self, key: str) -> str:
155    def to_index_case(self, key: str) -> str:
156        """Return the internal case of the key.
157
158        If `normalize_keys` is True, returns the normalized key.
159        Otherwise, return the original case ("pretty case") of the key.
160        """
161        try:
162            return self.quick_lookup[key]
163        except KeyError:
164            result = (
165                self._normalizer.normalize(key)
166                if self._normalize_keys
167                else self.to_display_case(key)
168            )
169            self.quick_lookup[key] = result
170            return result

Return the internal case of the key.

If normalize_keys is True, returns the normalized key. Otherwise, return the original case ("pretty case") of the key.

class StreamRecord(dict[str, typing.Any]):
173class StreamRecord(dict[str, Any]):
174    """The StreamRecord class is a case-aware, case-insensitive dictionary implementation.
175
176    It has these behaviors:
177    - When a key is retrieved, deleted, or checked for existence, it is always checked in a
178      case-insensitive manner.
179    - The original case is stored in a separate dictionary, so that the original case can be
180      retrieved when needed.
181    - Because it is subclassed from `dict`, the `StreamRecord` class can be passed as a normal
182      Python dictionary.
183    - In addition to the properties of the stream's records, the dictionary also stores the Airbyte
184      metadata columns: `_airbyte_raw_id`, `_airbyte_extracted_at`, and `_airbyte_meta`.
185
186    This behavior mirrors how a case-aware, case-insensitive SQL database would handle column
187    references.
188
189    There are two ways this class can store keys internally:
190    - If normalize_keys is True, the keys are normalized using the given normalizer.
191    - If normalize_keys is False, the original case of the keys is stored.
192
193    In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the
194    dictionary will be initialized with the given keys. If a key is not found in the input data, it
195    will be initialized with a value of None. When provided, the 'expected_keys' input will also
196    determine the original case of the keys.
197    """
198
199    def __init__(
200        self,
201        from_dict: dict,
202        *,
203        stream_record_handler: StreamRecordHandler,
204        with_internal_columns: bool = True,
205        extracted_at: datetime | None = None,
206    ) -> None:
207        """Initialize the dictionary with the given data.
208
209        Args:
210            from_dict: The dictionary to initialize the StreamRecord with.
211            stream_record_handler: The StreamRecordHandler to use for processing the record.
212            with_internal_columns: If `True`, the internal columns will be added to the record.
213            extracted_at: The time the record was extracted. If not provided, the current time will
214                be used.
215        """
216        self._stream_handler: StreamRecordHandler = stream_record_handler
217
218        # Start by initializing all values to None
219        self.update(dict.fromkeys(stream_record_handler.index_keys))
220
221        # Update the dictionary with the given data
222        if self._stream_handler.prune_extra_fields:
223            self.update(
224                {
225                    self._stream_handler.to_index_case(k): v
226                    for k, v in from_dict.items()
227                    if self._stream_handler.to_index_case(k) in self._stream_handler.index_keys
228                }
229            )
230        else:
231            self.update({self._stream_handler.to_index_case(k): v for k, v in from_dict.items()})
232
233        if with_internal_columns:
234            self.update(
235                {
236                    AB_RAW_ID_COLUMN: uuid7str(),
237                    AB_EXTRACTED_AT_COLUMN: extracted_at or datetime.now(pytz.utc),
238                    AB_META_COLUMN: {},
239                }
240            )
241
242    @classmethod
243    def from_record_message(
244        cls,
245        record_message: AirbyteRecordMessage,
246        *,
247        stream_record_handler: StreamRecordHandler,
248    ) -> StreamRecord:
249        """Return a StreamRecord from a RecordMessage."""
250        data_dict: dict[str, Any] = record_message.data.copy()
251        return cls(
252            from_dict=data_dict,
253            stream_record_handler=stream_record_handler,
254            with_internal_columns=True,
255            extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=pytz.utc),
256        )
257
258    def __getitem__(self, key: str) -> Any:  # noqa: ANN401
259        """Return the item with the given key."""
260        try:
261            return super().__getitem__(key)
262        except KeyError:
263            return super().__getitem__(self._stream_handler.to_index_case(key))
264
265    def __setitem__(self, key: str, value: Any) -> None:  # noqa: ANN401
266        """Set the item with the given key to the given value."""
267        index_case_key = self._stream_handler.to_index_case(key)
268        if (
269            self._stream_handler.prune_extra_fields
270            and index_case_key not in self._stream_handler.index_keys
271        ):
272            return
273
274        super().__setitem__(index_case_key, value)
275
276    def __delitem__(self, key: str) -> None:
277        """Delete the item with the given key."""
278        try:
279            super().__delitem__(key)
280        except KeyError:
281            index_case_key = self._stream_handler.to_index_case(key)
282            if super().__contains__(index_case_key):
283                super().__delitem__(index_case_key)
284                return
285        else:
286            # No failure. Key was deleted.
287            return
288
289        raise KeyError(key)
290
291    def __contains__(self, key: object) -> bool:
292        """Return whether the dictionary contains the given key."""
293        assert isinstance(key, str), "Key must be a string."
294        return super().__contains__(key) or super().__contains__(
295            self._stream_handler.to_index_case(key)
296        )
297
298    def __iter__(self) -> Iterator[str]:
299        """Return an iterator over the keys of the dictionary."""
300        return iter(super().__iter__())
301
302    def __len__(self) -> int:
303        """Return the number of items in the dictionary."""
304        return super().__len__()
305
306    def __eq__(self, other: object) -> bool:
307        """Return whether the StreamRecord is equal to the given dict or StreamRecord object."""
308        if isinstance(other, StreamRecord):
309            return dict(self) == dict(other)
310
311        if isinstance(other, dict):
312            return {k.lower(): v for k, v in self.items()} == {
313                k.lower(): v for k, v in other.items()
314            }
315        return False
316
317    def __hash__(self) -> int:  # type: ignore [override]  # Doesn't match superclass (dict)
318        """Return the hash of the dictionary with keys sorted."""
319        items = [(k, v) for k, v in self.items() if not isinstance(v, dict)]
320        return hash(tuple(sorted(items)))

The StreamRecord class is a case-aware, case-insensitive dictionary implementation.

It has these behaviors:

  • When a key is retrieved, deleted, or checked for existence, it is always checked in a case-insensitive manner.
  • The original case is stored in a separate dictionary, so that the original case can be retrieved when needed.
  • Because it is subclassed from dict, the StreamRecord class can be passed as a normal Python dictionary.
  • In addition to the properties of the stream's records, the dictionary also stores the Airbyte metadata columns: _airbyte_raw_id, _airbyte_extracted_at, and _airbyte_meta.

This behavior mirrors how a case-aware, case-insensitive SQL database would handle column references.

There are two ways this class can store keys internally:

  • If normalize_keys is True, the keys are normalized using the given normalizer.
  • If normalize_keys is False, the original case of the keys is stored.

In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the dictionary will be initialized with the given keys. If a key is not found in the input data, it will be initialized with a value of None. When provided, the 'expected_keys' input will also determine the original case of the keys.

@classmethod
def from_record_message( cls, record_message: airbyte_protocol.models.airbyte_protocol.AirbyteRecordMessage, *, stream_record_handler: StreamRecordHandler) -> StreamRecord:
242    @classmethod
243    def from_record_message(
244        cls,
245        record_message: AirbyteRecordMessage,
246        *,
247        stream_record_handler: StreamRecordHandler,
248    ) -> StreamRecord:
249        """Return a StreamRecord from a RecordMessage."""
250        data_dict: dict[str, Any] = record_message.data.copy()
251        return cls(
252            from_dict=data_dict,
253            stream_record_handler=stream_record_handler,
254            with_internal_columns=True,
255            extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=pytz.utc),
256        )

Return a StreamRecord from a RecordMessage.

Inherited Members
builtins.dict
get
setdefault
pop
popitem
keys
items
values
update
fromkeys
clear
copy