airbyte_cdk.test.catalog_builder

 1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 2
 3from typing import Any, Dict, List, Union, overload
 4
 5from airbyte_cdk.models import (
 6    ConfiguredAirbyteCatalog,
 7    ConfiguredAirbyteStream,
 8    ConfiguredAirbyteStreamSerializer,
 9    SyncMode,
10)
11
12
13class ConfiguredAirbyteStreamBuilder:
14    def __init__(self) -> None:
15        self._stream: Dict[str, Any] = {
16            "stream": {
17                "name": "any name",
18                "json_schema": {},
19                "supported_sync_modes": ["full_refresh", "incremental"],
20                "source_defined_primary_key": [["id"]],
21            },
22            "primary_key": [["id"]],
23            "sync_mode": "full_refresh",
24            "destination_sync_mode": "overwrite",
25        }
26
27    def with_name(self, name: str) -> "ConfiguredAirbyteStreamBuilder":
28        self._stream["stream"]["name"] = name  # type: ignore  # we assume that self._stream["stream"] is a Dict[str, Any]
29        return self
30
31    def with_sync_mode(self, sync_mode: SyncMode) -> "ConfiguredAirbyteStreamBuilder":
32        self._stream["sync_mode"] = sync_mode.name
33        return self
34
35    def with_primary_key(self, pk: List[List[str]]) -> "ConfiguredAirbyteStreamBuilder":
36        self._stream["primary_key"] = pk
37        self._stream["stream"]["source_defined_primary_key"] = pk  # type: ignore  # we assume that self._stream["stream"] is a Dict[str, Any]
38        return self
39
40    def with_json_schema(self, json_schema: Dict[str, Any]) -> "ConfiguredAirbyteStreamBuilder":
41        self._stream["stream"]["json_schema"] = json_schema
42        return self
43
44    def build(self) -> ConfiguredAirbyteStream:
45        return ConfiguredAirbyteStreamSerializer.load(self._stream)
46
47
48class CatalogBuilder:
49    def __init__(self) -> None:
50        self._streams: List[ConfiguredAirbyteStreamBuilder] = []
51
52    @overload
53    def with_stream(self, name: ConfiguredAirbyteStreamBuilder) -> "CatalogBuilder": ...
54
55    @overload
56    def with_stream(self, name: str, sync_mode: SyncMode) -> "CatalogBuilder": ...
57
58    def with_stream(
59        self,
60        name: Union[str, ConfiguredAirbyteStreamBuilder],
61        sync_mode: Union[SyncMode, None] = None,
62    ) -> "CatalogBuilder":
63        # As we are introducing a fully fledge ConfiguredAirbyteStreamBuilder, we would like to deprecate the previous interface
64        # with_stream(str, SyncMode)
65
66        # to avoid a breaking change, `name` needs to stay in the API but this can be either a name or a builder
67        name_or_builder = name
68        builder = (
69            name_or_builder
70            if isinstance(name_or_builder, ConfiguredAirbyteStreamBuilder)
71            else ConfiguredAirbyteStreamBuilder()
72            .with_name(name_or_builder)
73            .with_sync_mode(sync_mode)
74        )
75        self._streams.append(builder)
76        return self
77
78    def build(self) -> ConfiguredAirbyteCatalog:
79        return ConfiguredAirbyteCatalog(
80            streams=list(map(lambda builder: builder.build(), self._streams))
81        )
class ConfiguredAirbyteStreamBuilder:
14class ConfiguredAirbyteStreamBuilder:
15    def __init__(self) -> None:
16        self._stream: Dict[str, Any] = {
17            "stream": {
18                "name": "any name",
19                "json_schema": {},
20                "supported_sync_modes": ["full_refresh", "incremental"],
21                "source_defined_primary_key": [["id"]],
22            },
23            "primary_key": [["id"]],
24            "sync_mode": "full_refresh",
25            "destination_sync_mode": "overwrite",
26        }
27
28    def with_name(self, name: str) -> "ConfiguredAirbyteStreamBuilder":
29        self._stream["stream"]["name"] = name  # type: ignore  # we assume that self._stream["stream"] is a Dict[str, Any]
30        return self
31
32    def with_sync_mode(self, sync_mode: SyncMode) -> "ConfiguredAirbyteStreamBuilder":
33        self._stream["sync_mode"] = sync_mode.name
34        return self
35
36    def with_primary_key(self, pk: List[List[str]]) -> "ConfiguredAirbyteStreamBuilder":
37        self._stream["primary_key"] = pk
38        self._stream["stream"]["source_defined_primary_key"] = pk  # type: ignore  # we assume that self._stream["stream"] is a Dict[str, Any]
39        return self
40
41    def with_json_schema(self, json_schema: Dict[str, Any]) -> "ConfiguredAirbyteStreamBuilder":
42        self._stream["stream"]["json_schema"] = json_schema
43        return self
44
45    def build(self) -> ConfiguredAirbyteStream:
46        return ConfiguredAirbyteStreamSerializer.load(self._stream)
def with_name( self, name: str) -> ConfiguredAirbyteStreamBuilder:
28    def with_name(self, name: str) -> "ConfiguredAirbyteStreamBuilder":
29        self._stream["stream"]["name"] = name  # type: ignore  # we assume that self._stream["stream"] is a Dict[str, Any]
30        return self
def with_sync_mode( self, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode) -> ConfiguredAirbyteStreamBuilder:
32    def with_sync_mode(self, sync_mode: SyncMode) -> "ConfiguredAirbyteStreamBuilder":
33        self._stream["sync_mode"] = sync_mode.name
34        return self
def with_primary_key( self, pk: List[List[str]]) -> ConfiguredAirbyteStreamBuilder:
36    def with_primary_key(self, pk: List[List[str]]) -> "ConfiguredAirbyteStreamBuilder":
37        self._stream["primary_key"] = pk
38        self._stream["stream"]["source_defined_primary_key"] = pk  # type: ignore  # we assume that self._stream["stream"] is a Dict[str, Any]
39        return self
def with_json_schema( self, json_schema: Dict[str, Any]) -> ConfiguredAirbyteStreamBuilder:
41    def with_json_schema(self, json_schema: Dict[str, Any]) -> "ConfiguredAirbyteStreamBuilder":
42        self._stream["stream"]["json_schema"] = json_schema
43        return self
def build( self) -> airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteStream:
45    def build(self) -> ConfiguredAirbyteStream:
46        return ConfiguredAirbyteStreamSerializer.load(self._stream)
class CatalogBuilder:
49class CatalogBuilder:
50    def __init__(self) -> None:
51        self._streams: List[ConfiguredAirbyteStreamBuilder] = []
52
53    @overload
54    def with_stream(self, name: ConfiguredAirbyteStreamBuilder) -> "CatalogBuilder": ...
55
56    @overload
57    def with_stream(self, name: str, sync_mode: SyncMode) -> "CatalogBuilder": ...
58
59    def with_stream(
60        self,
61        name: Union[str, ConfiguredAirbyteStreamBuilder],
62        sync_mode: Union[SyncMode, None] = None,
63    ) -> "CatalogBuilder":
64        # As we are introducing a fully fledge ConfiguredAirbyteStreamBuilder, we would like to deprecate the previous interface
65        # with_stream(str, SyncMode)
66
67        # to avoid a breaking change, `name` needs to stay in the API but this can be either a name or a builder
68        name_or_builder = name
69        builder = (
70            name_or_builder
71            if isinstance(name_or_builder, ConfiguredAirbyteStreamBuilder)
72            else ConfiguredAirbyteStreamBuilder()
73            .with_name(name_or_builder)
74            .with_sync_mode(sync_mode)
75        )
76        self._streams.append(builder)
77        return self
78
79    def build(self) -> ConfiguredAirbyteCatalog:
80        return ConfiguredAirbyteCatalog(
81            streams=list(map(lambda builder: builder.build(), self._streams))
82        )
def with_stream( self, name: Union[str, ConfiguredAirbyteStreamBuilder], sync_mode: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode] = None) -> CatalogBuilder:
59    def with_stream(
60        self,
61        name: Union[str, ConfiguredAirbyteStreamBuilder],
62        sync_mode: Union[SyncMode, None] = None,
63    ) -> "CatalogBuilder":
64        # As we are introducing a fully fledge ConfiguredAirbyteStreamBuilder, we would like to deprecate the previous interface
65        # with_stream(str, SyncMode)
66
67        # to avoid a breaking change, `name` needs to stay in the API but this can be either a name or a builder
68        name_or_builder = name
69        builder = (
70            name_or_builder
71            if isinstance(name_or_builder, ConfiguredAirbyteStreamBuilder)
72            else ConfiguredAirbyteStreamBuilder()
73            .with_name(name_or_builder)
74            .with_sync_mode(sync_mode)
75        )
76        self._streams.append(builder)
77        return self
def build( self) -> airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog:
79    def build(self) -> ConfiguredAirbyteCatalog:
80        return ConfiguredAirbyteCatalog(
81            streams=list(map(lambda builder: builder.build(), self._streams))
82        )