airbyte_cdk.test.catalog_builder
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2 3from typing import Any, Dict, List, Union, overload 4 5from airbyte_cdk.models import ( 6 ConfiguredAirbyteCatalog, 7 ConfiguredAirbyteStream, 8 ConfiguredAirbyteStreamSerializer, 9 SyncMode, 10) 11 12 13class ConfiguredAirbyteStreamBuilder: 14 def __init__(self) -> None: 15 self._stream: Dict[str, Any] = { 16 "stream": { 17 "name": "any name", 18 "json_schema": {}, 19 "supported_sync_modes": ["full_refresh", "incremental"], 20 "source_defined_primary_key": [["id"]], 21 }, 22 "primary_key": [["id"]], 23 "sync_mode": "full_refresh", 24 "destination_sync_mode": "overwrite", 25 } 26 27 def with_name(self, name: str) -> "ConfiguredAirbyteStreamBuilder": 28 self._stream["stream"]["name"] = name # type: ignore # we assume that self._stream["stream"] is a Dict[str, Any] 29 return self 30 31 def with_sync_mode(self, sync_mode: SyncMode) -> "ConfiguredAirbyteStreamBuilder": 32 self._stream["sync_mode"] = sync_mode.name 33 return self 34 35 def with_primary_key(self, pk: List[List[str]]) -> "ConfiguredAirbyteStreamBuilder": 36 self._stream["primary_key"] = pk 37 self._stream["stream"]["source_defined_primary_key"] = pk # type: ignore # we assume that self._stream["stream"] is a Dict[str, Any] 38 return self 39 40 def with_json_schema(self, json_schema: Dict[str, Any]) -> "ConfiguredAirbyteStreamBuilder": 41 self._stream["stream"]["json_schema"] = json_schema 42 return self 43 44 def build(self) -> ConfiguredAirbyteStream: 45 return ConfiguredAirbyteStreamSerializer.load(self._stream) 46 47 48class CatalogBuilder: 49 def __init__(self) -> None: 50 self._streams: List[ConfiguredAirbyteStreamBuilder] = [] 51 52 @overload 53 def with_stream(self, name: ConfiguredAirbyteStreamBuilder) -> "CatalogBuilder": ... 54 55 @overload 56 def with_stream(self, name: str, sync_mode: SyncMode) -> "CatalogBuilder": ... 57 58 def with_stream( 59 self, 60 name: Union[str, ConfiguredAirbyteStreamBuilder], 61 sync_mode: Union[SyncMode, None] = None, 62 ) -> "CatalogBuilder": 63 # As we are introducing a fully fledge ConfiguredAirbyteStreamBuilder, we would like to deprecate the previous interface 64 # with_stream(str, SyncMode) 65 66 # to avoid a breaking change, `name` needs to stay in the API but this can be either a name or a builder 67 name_or_builder = name 68 builder = ( 69 name_or_builder 70 if isinstance(name_or_builder, ConfiguredAirbyteStreamBuilder) 71 else ConfiguredAirbyteStreamBuilder() 72 .with_name(name_or_builder) 73 .with_sync_mode(sync_mode) 74 ) 75 self._streams.append(builder) 76 return self 77 78 def build(self) -> ConfiguredAirbyteCatalog: 79 return ConfiguredAirbyteCatalog( 80 streams=list(map(lambda builder: builder.build(), self._streams)) 81 )
class
ConfiguredAirbyteStreamBuilder:
14class ConfiguredAirbyteStreamBuilder: 15 def __init__(self) -> None: 16 self._stream: Dict[str, Any] = { 17 "stream": { 18 "name": "any name", 19 "json_schema": {}, 20 "supported_sync_modes": ["full_refresh", "incremental"], 21 "source_defined_primary_key": [["id"]], 22 }, 23 "primary_key": [["id"]], 24 "sync_mode": "full_refresh", 25 "destination_sync_mode": "overwrite", 26 } 27 28 def with_name(self, name: str) -> "ConfiguredAirbyteStreamBuilder": 29 self._stream["stream"]["name"] = name # type: ignore # we assume that self._stream["stream"] is a Dict[str, Any] 30 return self 31 32 def with_sync_mode(self, sync_mode: SyncMode) -> "ConfiguredAirbyteStreamBuilder": 33 self._stream["sync_mode"] = sync_mode.name 34 return self 35 36 def with_primary_key(self, pk: List[List[str]]) -> "ConfiguredAirbyteStreamBuilder": 37 self._stream["primary_key"] = pk 38 self._stream["stream"]["source_defined_primary_key"] = pk # type: ignore # we assume that self._stream["stream"] is a Dict[str, Any] 39 return self 40 41 def with_json_schema(self, json_schema: Dict[str, Any]) -> "ConfiguredAirbyteStreamBuilder": 42 self._stream["stream"]["json_schema"] = json_schema 43 return self 44 45 def build(self) -> ConfiguredAirbyteStream: 46 return ConfiguredAirbyteStreamSerializer.load(self._stream)
def
with_sync_mode( self, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode) -> ConfiguredAirbyteStreamBuilder:
class
CatalogBuilder:
49class CatalogBuilder: 50 def __init__(self) -> None: 51 self._streams: List[ConfiguredAirbyteStreamBuilder] = [] 52 53 @overload 54 def with_stream(self, name: ConfiguredAirbyteStreamBuilder) -> "CatalogBuilder": ... 55 56 @overload 57 def with_stream(self, name: str, sync_mode: SyncMode) -> "CatalogBuilder": ... 58 59 def with_stream( 60 self, 61 name: Union[str, ConfiguredAirbyteStreamBuilder], 62 sync_mode: Union[SyncMode, None] = None, 63 ) -> "CatalogBuilder": 64 # As we are introducing a fully fledge ConfiguredAirbyteStreamBuilder, we would like to deprecate the previous interface 65 # with_stream(str, SyncMode) 66 67 # to avoid a breaking change, `name` needs to stay in the API but this can be either a name or a builder 68 name_or_builder = name 69 builder = ( 70 name_or_builder 71 if isinstance(name_or_builder, ConfiguredAirbyteStreamBuilder) 72 else ConfiguredAirbyteStreamBuilder() 73 .with_name(name_or_builder) 74 .with_sync_mode(sync_mode) 75 ) 76 self._streams.append(builder) 77 return self 78 79 def build(self) -> ConfiguredAirbyteCatalog: 80 return ConfiguredAirbyteCatalog( 81 streams=list(map(lambda builder: builder.build(), self._streams)) 82 )
def
with_stream( self, name: Union[str, ConfiguredAirbyteStreamBuilder], sync_mode: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode] = None) -> CatalogBuilder:
59 def with_stream( 60 self, 61 name: Union[str, ConfiguredAirbyteStreamBuilder], 62 sync_mode: Union[SyncMode, None] = None, 63 ) -> "CatalogBuilder": 64 # As we are introducing a fully fledge ConfiguredAirbyteStreamBuilder, we would like to deprecate the previous interface 65 # with_stream(str, SyncMode) 66 67 # to avoid a breaking change, `name` needs to stay in the API but this can be either a name or a builder 68 name_or_builder = name 69 builder = ( 70 name_or_builder 71 if isinstance(name_or_builder, ConfiguredAirbyteStreamBuilder) 72 else ConfiguredAirbyteStreamBuilder() 73 .with_name(name_or_builder) 74 .with_sync_mode(sync_mode) 75 ) 76 self._streams.append(builder) 77 return self