airbyte_cdk.destinations.vector_db_based.embedder

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import os
  6from abc import ABC, abstractmethod
  7from dataclasses import dataclass
  8from typing import List, Optional, Union, cast
  9
 10from langchain.embeddings.cohere import CohereEmbeddings
 11from langchain.embeddings.fake import FakeEmbeddings
 12from langchain.embeddings.localai import LocalAIEmbeddings
 13from langchain.embeddings.openai import OpenAIEmbeddings
 14
 15from airbyte_cdk.destinations.vector_db_based.config import (
 16    AzureOpenAIEmbeddingConfigModel,
 17    CohereEmbeddingConfigModel,
 18    FakeEmbeddingConfigModel,
 19    FromFieldEmbeddingConfigModel,
 20    OpenAICompatibleEmbeddingConfigModel,
 21    OpenAIEmbeddingConfigModel,
 22    ProcessingConfigModel,
 23)
 24from airbyte_cdk.destinations.vector_db_based.utils import create_chunks, format_exception
 25from airbyte_cdk.models import AirbyteRecordMessage
 26from airbyte_cdk.utils.traced_exception import AirbyteTracedException, FailureType
 27
 28
 29@dataclass
 30class Document:
 31    page_content: str
 32    record: AirbyteRecordMessage
 33
 34
 35class Embedder(ABC):
 36    """
 37    Embedder is an abstract class that defines the interface for embedding text.
 38
 39    The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination.
 40    The destination connector is responsible to create an embedder instance and pass it to the writer.
 41    The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
 42    """
 43
 44    def __init__(self) -> None:
 45        pass
 46
 47    @abstractmethod
 48    def check(self) -> Optional[str]:
 49        pass
 50
 51    @abstractmethod
 52    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
 53        """
 54        Embed the text of each chunk and return the resulting embedding vectors.
 55        If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.
 56        """
 57        pass
 58
 59    @property
 60    @abstractmethod
 61    def embedding_dimensions(self) -> int:
 62        pass
 63
 64
 65OPEN_AI_VECTOR_SIZE = 1536
 66
 67OPEN_AI_TOKEN_LIMIT = 150_000  # limit of tokens per minute
 68
 69
 70class BaseOpenAIEmbedder(Embedder):
 71    def __init__(self, embeddings: OpenAIEmbeddings, chunk_size: int):
 72        super().__init__()
 73        self.embeddings = embeddings
 74        self.chunk_size = chunk_size
 75
 76    def check(self) -> Optional[str]:
 77        try:
 78            self.embeddings.embed_query("test")
 79        except Exception as e:
 80            return format_exception(e)
 81        return None
 82
 83    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
 84        """
 85        Embed the text of each chunk and return the resulting embedding vectors.
 86
 87        As the OpenAI API will fail if more than the per-minute limit worth of tokens is sent at once, we split the request into batches and embed each batch separately.
 88        It's still possible to run into the rate limit between each embed call because the available token budget hasn't recovered between the calls,
 89        but the built-in retry mechanism of the OpenAI client handles that.
 90        """
 91        # Each chunk can hold at most self.chunk_size tokens, so tokens-per-minute by maximum tokens per chunk is the number of documents that can be embedded at once without exhausting the limit in a single request
 92        embedding_batch_size = OPEN_AI_TOKEN_LIMIT // self.chunk_size
 93        batches = create_chunks(documents, batch_size=embedding_batch_size)
 94        embeddings: List[Optional[List[float]]] = []
 95        for batch in batches:
 96            embeddings.extend(
 97                self.embeddings.embed_documents([chunk.page_content for chunk in batch])
 98            )
 99        return embeddings
100
101    @property
102    def embedding_dimensions(self) -> int:
103        # vector size produced by text-embedding-ada-002 model
104        return OPEN_AI_VECTOR_SIZE
105
106
107class OpenAIEmbedder(BaseOpenAIEmbedder):
108    def __init__(self, config: OpenAIEmbeddingConfigModel, chunk_size: int):
109        super().__init__(
110            OpenAIEmbeddings(  # type: ignore [call-arg]
111                openai_api_key=config.openai_key, max_retries=15, disallowed_special=()
112            ),
113            chunk_size,
114        )  # type: ignore
115
116
117class AzureOpenAIEmbedder(BaseOpenAIEmbedder):
118    def __init__(self, config: AzureOpenAIEmbeddingConfigModel, chunk_size: int):
119        # Azure OpenAI API has — as of 20230927 — a limit of 16 documents per request
120        super().__init__(
121            OpenAIEmbeddings(  # type: ignore [call-arg]
122                openai_api_key=config.openai_key,
123                chunk_size=16,
124                max_retries=15,
125                openai_api_type="azure",
126                openai_api_version="2023-05-15",
127                openai_api_base=config.api_base,
128                deployment=config.deployment,
129                disallowed_special=(),
130            ),
131            chunk_size,
132        )  # type: ignore
133
134
135COHERE_VECTOR_SIZE = 1024
136
137
138class CohereEmbedder(Embedder):
139    def __init__(self, config: CohereEmbeddingConfigModel):
140        super().__init__()
141        # Client is set internally
142        self.embeddings = CohereEmbeddings(
143            cohere_api_key=config.cohere_key, model="embed-english-light-v2.0"
144        )  # type: ignore
145
146    def check(self) -> Optional[str]:
147        try:
148            self.embeddings.embed_query("test")
149        except Exception as e:
150            return format_exception(e)
151        return None
152
153    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
154        return cast(
155            List[Optional[List[float]]],
156            self.embeddings.embed_documents([document.page_content for document in documents]),
157        )
158
159    @property
160    def embedding_dimensions(self) -> int:
161        # vector size produced by text-embedding-ada-002 model
162        return COHERE_VECTOR_SIZE
163
164
165class FakeEmbedder(Embedder):
166    def __init__(self, config: FakeEmbeddingConfigModel):
167        super().__init__()
168        self.embeddings = FakeEmbeddings(size=OPEN_AI_VECTOR_SIZE)
169
170    def check(self) -> Optional[str]:
171        try:
172            self.embeddings.embed_query("test")
173        except Exception as e:
174            return format_exception(e)
175        return None
176
177    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
178        return cast(
179            List[Optional[List[float]]],
180            self.embeddings.embed_documents([document.page_content for document in documents]),
181        )
182
183    @property
184    def embedding_dimensions(self) -> int:
185        # use same vector size as for OpenAI embeddings to keep it realistic
186        return OPEN_AI_VECTOR_SIZE
187
188
189CLOUD_DEPLOYMENT_MODE = "cloud"
190
191
192class OpenAICompatibleEmbedder(Embedder):
193    def __init__(self, config: OpenAICompatibleEmbeddingConfigModel):
194        super().__init__()
195        self.config = config
196        # Client is set internally
197        # Always set an API key even if there is none defined in the config because the validator will fail otherwise. Embedding APIs that don't require an API key don't fail if one is provided, so this is not breaking usage.
198        self.embeddings = LocalAIEmbeddings(
199            model=config.model_name,
200            openai_api_key=config.api_key or "dummy-api-key",
201            openai_api_base=config.base_url,
202            max_retries=15,
203            disallowed_special=(),
204        )  # type: ignore
205
206    def check(self) -> Optional[str]:
207        deployment_mode = os.environ.get("DEPLOYMENT_MODE", "")
208        if (
209            deployment_mode.casefold() == CLOUD_DEPLOYMENT_MODE
210            and not self.config.base_url.startswith("https://")
211        ):
212            return "Base URL must start with https://"
213
214        try:
215            self.embeddings.embed_query("test")
216        except Exception as e:
217            return format_exception(e)
218        return None
219
220    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
221        return cast(
222            List[Optional[List[float]]],
223            self.embeddings.embed_documents([document.page_content for document in documents]),
224        )
225
226    @property
227    def embedding_dimensions(self) -> int:
228        # vector size produced by the model
229        return self.config.dimensions
230
231
232class FromFieldEmbedder(Embedder):
233    def __init__(self, config: FromFieldEmbeddingConfigModel):
234        super().__init__()
235        self.config = config
236
237    def check(self) -> Optional[str]:
238        return None
239
240    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
241        """
242        From each chunk, pull the embedding from the field specified in the config.
243        Check that the field exists, is a list of numbers and is the correct size. If not, raise an AirbyteTracedException explaining the problem.
244        """
245        embeddings: List[Optional[List[float]]] = []
246        for document in documents:
247            data = document.record.data
248            if self.config.field_name not in data:
249                raise AirbyteTracedException(
250                    internal_message="Embedding vector field not found",
251                    failure_type=FailureType.config_error,
252                    message=f"Record {str(data)[:250]}... in stream {document.record.stream}  does not contain embedding vector field {self.config.field_name}. Please check your embedding configuration, the embedding vector field has to be set correctly on every record.",
253                )
254            field = data[self.config.field_name]
255            if not isinstance(field, list) or not all(isinstance(x, (int, float)) for x in field):
256                raise AirbyteTracedException(
257                    internal_message="Embedding vector field not a list of numbers",
258                    failure_type=FailureType.config_error,
259                    message=f"Record {str(data)[:250]}...  in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it is not a list of numbers. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.",
260                )
261            if len(field) != self.config.dimensions:
262                raise AirbyteTracedException(
263                    internal_message="Embedding vector field has wrong length",
264                    failure_type=FailureType.config_error,
265                    message=f"Record {str(data)[:250]}...  in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it has length {len(field)} instead of the configured {self.config.dimensions}. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.",
266                )
267            embeddings.append(field)
268
269        return embeddings
270
271    @property
272    def embedding_dimensions(self) -> int:
273        return self.config.dimensions
274
275
276embedder_map = {
277    "openai": OpenAIEmbedder,
278    "cohere": CohereEmbedder,
279    "fake": FakeEmbedder,
280    "azure_openai": AzureOpenAIEmbedder,
281    "from_field": FromFieldEmbedder,
282    "openai_compatible": OpenAICompatibleEmbedder,
283}
284
285
286def create_from_config(
287    embedding_config: Union[
288        AzureOpenAIEmbeddingConfigModel,
289        CohereEmbeddingConfigModel,
290        FakeEmbeddingConfigModel,
291        FromFieldEmbeddingConfigModel,
292        OpenAIEmbeddingConfigModel,
293        OpenAICompatibleEmbeddingConfigModel,
294    ],
295    processing_config: ProcessingConfigModel,
296) -> Embedder:
297    if embedding_config.mode == "azure_openai" or embedding_config.mode == "openai":
298        return cast(
299            Embedder,
300            embedder_map[embedding_config.mode](embedding_config, processing_config.chunk_size),
301        )
302    else:
303        return cast(Embedder, embedder_map[embedding_config.mode](embedding_config))
@dataclass
class Document:
30@dataclass
31class Document:
32    page_content: str
33    record: AirbyteRecordMessage
Document( page_content: str, record: airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteRecordMessage)
page_content: str
record: airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteRecordMessage
class Embedder(abc.ABC):
36class Embedder(ABC):
37    """
38    Embedder is an abstract class that defines the interface for embedding text.
39
40    The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination.
41    The destination connector is responsible to create an embedder instance and pass it to the writer.
42    The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
43    """
44
45    def __init__(self) -> None:
46        pass
47
48    @abstractmethod
49    def check(self) -> Optional[str]:
50        pass
51
52    @abstractmethod
53    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
54        """
55        Embed the text of each chunk and return the resulting embedding vectors.
56        If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.
57        """
58        pass
59
60    @property
61    @abstractmethod
62    def embedding_dimensions(self) -> int:
63        pass

Embedder is an abstract class that defines the interface for embedding text.

The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.

@abstractmethod
def check(self) -> Optional[str]:
48    @abstractmethod
49    def check(self) -> Optional[str]:
50        pass
@abstractmethod
def embed_documents( self, documents: List[Document]) -> List[Optional[List[float]]]:
52    @abstractmethod
53    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
54        """
55        Embed the text of each chunk and return the resulting embedding vectors.
56        If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.
57        """
58        pass

Embed the text of each chunk and return the resulting embedding vectors. If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.

embedding_dimensions: int
60    @property
61    @abstractmethod
62    def embedding_dimensions(self) -> int:
63        pass
OPEN_AI_VECTOR_SIZE = 1536
OPEN_AI_TOKEN_LIMIT = 150000
class BaseOpenAIEmbedder(Embedder):
 71class BaseOpenAIEmbedder(Embedder):
 72    def __init__(self, embeddings: OpenAIEmbeddings, chunk_size: int):
 73        super().__init__()
 74        self.embeddings = embeddings
 75        self.chunk_size = chunk_size
 76
 77    def check(self) -> Optional[str]:
 78        try:
 79            self.embeddings.embed_query("test")
 80        except Exception as e:
 81            return format_exception(e)
 82        return None
 83
 84    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
 85        """
 86        Embed the text of each chunk and return the resulting embedding vectors.
 87
 88        As the OpenAI API will fail if more than the per-minute limit worth of tokens is sent at once, we split the request into batches and embed each batch separately.
 89        It's still possible to run into the rate limit between each embed call because the available token budget hasn't recovered between the calls,
 90        but the built-in retry mechanism of the OpenAI client handles that.
 91        """
 92        # Each chunk can hold at most self.chunk_size tokens, so tokens-per-minute by maximum tokens per chunk is the number of documents that can be embedded at once without exhausting the limit in a single request
 93        embedding_batch_size = OPEN_AI_TOKEN_LIMIT // self.chunk_size
 94        batches = create_chunks(documents, batch_size=embedding_batch_size)
 95        embeddings: List[Optional[List[float]]] = []
 96        for batch in batches:
 97            embeddings.extend(
 98                self.embeddings.embed_documents([chunk.page_content for chunk in batch])
 99            )
100        return embeddings
101
102    @property
103    def embedding_dimensions(self) -> int:
104        # vector size produced by text-embedding-ada-002 model
105        return OPEN_AI_VECTOR_SIZE

Embedder is an abstract class that defines the interface for embedding text.

The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.

BaseOpenAIEmbedder( embeddings: langchain_community.embeddings.openai.OpenAIEmbeddings, chunk_size: int)
72    def __init__(self, embeddings: OpenAIEmbeddings, chunk_size: int):
73        super().__init__()
74        self.embeddings = embeddings
75        self.chunk_size = chunk_size
embeddings
chunk_size
def check(self) -> Optional[str]:
77    def check(self) -> Optional[str]:
78        try:
79            self.embeddings.embed_query("test")
80        except Exception as e:
81            return format_exception(e)
82        return None
def embed_documents( self, documents: List[Document]) -> List[Optional[List[float]]]:
 84    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
 85        """
 86        Embed the text of each chunk and return the resulting embedding vectors.
 87
 88        As the OpenAI API will fail if more than the per-minute limit worth of tokens is sent at once, we split the request into batches and embed each batch separately.
 89        It's still possible to run into the rate limit between each embed call because the available token budget hasn't recovered between the calls,
 90        but the built-in retry mechanism of the OpenAI client handles that.
 91        """
 92        # Each chunk can hold at most self.chunk_size tokens, so tokens-per-minute by maximum tokens per chunk is the number of documents that can be embedded at once without exhausting the limit in a single request
 93        embedding_batch_size = OPEN_AI_TOKEN_LIMIT // self.chunk_size
 94        batches = create_chunks(documents, batch_size=embedding_batch_size)
 95        embeddings: List[Optional[List[float]]] = []
 96        for batch in batches:
 97            embeddings.extend(
 98                self.embeddings.embed_documents([chunk.page_content for chunk in batch])
 99            )
100        return embeddings

Embed the text of each chunk and return the resulting embedding vectors.

As the OpenAI API will fail if more than the per-minute limit worth of tokens is sent at once, we split the request into batches and embed each batch separately. It's still possible to run into the rate limit between each embed call because the available token budget hasn't recovered between the calls, but the built-in retry mechanism of the OpenAI client handles that.

embedding_dimensions: int
102    @property
103    def embedding_dimensions(self) -> int:
104        # vector size produced by text-embedding-ada-002 model
105        return OPEN_AI_VECTOR_SIZE
class OpenAIEmbedder(BaseOpenAIEmbedder):
108class OpenAIEmbedder(BaseOpenAIEmbedder):
109    def __init__(self, config: OpenAIEmbeddingConfigModel, chunk_size: int):
110        super().__init__(
111            OpenAIEmbeddings(  # type: ignore [call-arg]
112                openai_api_key=config.openai_key, max_retries=15, disallowed_special=()
113            ),
114            chunk_size,
115        )  # type: ignore

Embedder is an abstract class that defines the interface for embedding text.

The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.

OpenAIEmbedder( config: airbyte_cdk.destinations.vector_db_based.OpenAIEmbeddingConfigModel, chunk_size: int)
109    def __init__(self, config: OpenAIEmbeddingConfigModel, chunk_size: int):
110        super().__init__(
111            OpenAIEmbeddings(  # type: ignore [call-arg]
112                openai_api_key=config.openai_key, max_retries=15, disallowed_special=()
113            ),
114            chunk_size,
115        )  # type: ignore
class AzureOpenAIEmbedder(BaseOpenAIEmbedder):
118class AzureOpenAIEmbedder(BaseOpenAIEmbedder):
119    def __init__(self, config: AzureOpenAIEmbeddingConfigModel, chunk_size: int):
120        # Azure OpenAI API has — as of 20230927 — a limit of 16 documents per request
121        super().__init__(
122            OpenAIEmbeddings(  # type: ignore [call-arg]
123                openai_api_key=config.openai_key,
124                chunk_size=16,
125                max_retries=15,
126                openai_api_type="azure",
127                openai_api_version="2023-05-15",
128                openai_api_base=config.api_base,
129                deployment=config.deployment,
130                disallowed_special=(),
131            ),
132            chunk_size,
133        )  # type: ignore

Embedder is an abstract class that defines the interface for embedding text.

The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.

AzureOpenAIEmbedder( config: airbyte_cdk.destinations.vector_db_based.AzureOpenAIEmbeddingConfigModel, chunk_size: int)
119    def __init__(self, config: AzureOpenAIEmbeddingConfigModel, chunk_size: int):
120        # Azure OpenAI API has — as of 20230927 — a limit of 16 documents per request
121        super().__init__(
122            OpenAIEmbeddings(  # type: ignore [call-arg]
123                openai_api_key=config.openai_key,
124                chunk_size=16,
125                max_retries=15,
126                openai_api_type="azure",
127                openai_api_version="2023-05-15",
128                openai_api_base=config.api_base,
129                deployment=config.deployment,
130                disallowed_special=(),
131            ),
132            chunk_size,
133        )  # type: ignore
COHERE_VECTOR_SIZE = 1024
class CohereEmbedder(Embedder):
139class CohereEmbedder(Embedder):
140    def __init__(self, config: CohereEmbeddingConfigModel):
141        super().__init__()
142        # Client is set internally
143        self.embeddings = CohereEmbeddings(
144            cohere_api_key=config.cohere_key, model="embed-english-light-v2.0"
145        )  # type: ignore
146
147    def check(self) -> Optional[str]:
148        try:
149            self.embeddings.embed_query("test")
150        except Exception as e:
151            return format_exception(e)
152        return None
153
154    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
155        return cast(
156            List[Optional[List[float]]],
157            self.embeddings.embed_documents([document.page_content for document in documents]),
158        )
159
160    @property
161    def embedding_dimensions(self) -> int:
162        # vector size produced by text-embedding-ada-002 model
163        return COHERE_VECTOR_SIZE

Embedder is an abstract class that defines the interface for embedding text.

The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.

140    def __init__(self, config: CohereEmbeddingConfigModel):
141        super().__init__()
142        # Client is set internally
143        self.embeddings = CohereEmbeddings(
144            cohere_api_key=config.cohere_key, model="embed-english-light-v2.0"
145        )  # type: ignore
embeddings
def check(self) -> Optional[str]:
147    def check(self) -> Optional[str]:
148        try:
149            self.embeddings.embed_query("test")
150        except Exception as e:
151            return format_exception(e)
152        return None
def embed_documents( self, documents: List[Document]) -> List[Optional[List[float]]]:
154    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
155        return cast(
156            List[Optional[List[float]]],
157            self.embeddings.embed_documents([document.page_content for document in documents]),
158        )

Embed the text of each chunk and return the resulting embedding vectors. If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.

embedding_dimensions: int
160    @property
161    def embedding_dimensions(self) -> int:
162        # vector size produced by text-embedding-ada-002 model
163        return COHERE_VECTOR_SIZE
class FakeEmbedder(Embedder):
166class FakeEmbedder(Embedder):
167    def __init__(self, config: FakeEmbeddingConfigModel):
168        super().__init__()
169        self.embeddings = FakeEmbeddings(size=OPEN_AI_VECTOR_SIZE)
170
171    def check(self) -> Optional[str]:
172        try:
173            self.embeddings.embed_query("test")
174        except Exception as e:
175            return format_exception(e)
176        return None
177
178    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
179        return cast(
180            List[Optional[List[float]]],
181            self.embeddings.embed_documents([document.page_content for document in documents]),
182        )
183
184    @property
185    def embedding_dimensions(self) -> int:
186        # use same vector size as for OpenAI embeddings to keep it realistic
187        return OPEN_AI_VECTOR_SIZE

Embedder is an abstract class that defines the interface for embedding text.

The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.

167    def __init__(self, config: FakeEmbeddingConfigModel):
168        super().__init__()
169        self.embeddings = FakeEmbeddings(size=OPEN_AI_VECTOR_SIZE)
embeddings
def check(self) -> Optional[str]:
171    def check(self) -> Optional[str]:
172        try:
173            self.embeddings.embed_query("test")
174        except Exception as e:
175            return format_exception(e)
176        return None
def embed_documents( self, documents: List[Document]) -> List[Optional[List[float]]]:
178    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
179        return cast(
180            List[Optional[List[float]]],
181            self.embeddings.embed_documents([document.page_content for document in documents]),
182        )

Embed the text of each chunk and return the resulting embedding vectors. If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.

embedding_dimensions: int
184    @property
185    def embedding_dimensions(self) -> int:
186        # use same vector size as for OpenAI embeddings to keep it realistic
187        return OPEN_AI_VECTOR_SIZE
CLOUD_DEPLOYMENT_MODE = 'cloud'
class OpenAICompatibleEmbedder(Embedder):
193class OpenAICompatibleEmbedder(Embedder):
194    def __init__(self, config: OpenAICompatibleEmbeddingConfigModel):
195        super().__init__()
196        self.config = config
197        # Client is set internally
198        # Always set an API key even if there is none defined in the config because the validator will fail otherwise. Embedding APIs that don't require an API key don't fail if one is provided, so this is not breaking usage.
199        self.embeddings = LocalAIEmbeddings(
200            model=config.model_name,
201            openai_api_key=config.api_key or "dummy-api-key",
202            openai_api_base=config.base_url,
203            max_retries=15,
204            disallowed_special=(),
205        )  # type: ignore
206
207    def check(self) -> Optional[str]:
208        deployment_mode = os.environ.get("DEPLOYMENT_MODE", "")
209        if (
210            deployment_mode.casefold() == CLOUD_DEPLOYMENT_MODE
211            and not self.config.base_url.startswith("https://")
212        ):
213            return "Base URL must start with https://"
214
215        try:
216            self.embeddings.embed_query("test")
217        except Exception as e:
218            return format_exception(e)
219        return None
220
221    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
222        return cast(
223            List[Optional[List[float]]],
224            self.embeddings.embed_documents([document.page_content for document in documents]),
225        )
226
227    @property
228    def embedding_dimensions(self) -> int:
229        # vector size produced by the model
230        return self.config.dimensions

Embedder is an abstract class that defines the interface for embedding text.

The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.

194    def __init__(self, config: OpenAICompatibleEmbeddingConfigModel):
195        super().__init__()
196        self.config = config
197        # Client is set internally
198        # Always set an API key even if there is none defined in the config because the validator will fail otherwise. Embedding APIs that don't require an API key don't fail if one is provided, so this is not breaking usage.
199        self.embeddings = LocalAIEmbeddings(
200            model=config.model_name,
201            openai_api_key=config.api_key or "dummy-api-key",
202            openai_api_base=config.base_url,
203            max_retries=15,
204            disallowed_special=(),
205        )  # type: ignore
config
embeddings
def check(self) -> Optional[str]:
207    def check(self) -> Optional[str]:
208        deployment_mode = os.environ.get("DEPLOYMENT_MODE", "")
209        if (
210            deployment_mode.casefold() == CLOUD_DEPLOYMENT_MODE
211            and not self.config.base_url.startswith("https://")
212        ):
213            return "Base URL must start with https://"
214
215        try:
216            self.embeddings.embed_query("test")
217        except Exception as e:
218            return format_exception(e)
219        return None
def embed_documents( self, documents: List[Document]) -> List[Optional[List[float]]]:
221    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
222        return cast(
223            List[Optional[List[float]]],
224            self.embeddings.embed_documents([document.page_content for document in documents]),
225        )

Embed the text of each chunk and return the resulting embedding vectors. If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.

embedding_dimensions: int
227    @property
228    def embedding_dimensions(self) -> int:
229        # vector size produced by the model
230        return self.config.dimensions
class FromFieldEmbedder(Embedder):
233class FromFieldEmbedder(Embedder):
234    def __init__(self, config: FromFieldEmbeddingConfigModel):
235        super().__init__()
236        self.config = config
237
238    def check(self) -> Optional[str]:
239        return None
240
241    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
242        """
243        From each chunk, pull the embedding from the field specified in the config.
244        Check that the field exists, is a list of numbers and is the correct size. If not, raise an AirbyteTracedException explaining the problem.
245        """
246        embeddings: List[Optional[List[float]]] = []
247        for document in documents:
248            data = document.record.data
249            if self.config.field_name not in data:
250                raise AirbyteTracedException(
251                    internal_message="Embedding vector field not found",
252                    failure_type=FailureType.config_error,
253                    message=f"Record {str(data)[:250]}... in stream {document.record.stream}  does not contain embedding vector field {self.config.field_name}. Please check your embedding configuration, the embedding vector field has to be set correctly on every record.",
254                )
255            field = data[self.config.field_name]
256            if not isinstance(field, list) or not all(isinstance(x, (int, float)) for x in field):
257                raise AirbyteTracedException(
258                    internal_message="Embedding vector field not a list of numbers",
259                    failure_type=FailureType.config_error,
260                    message=f"Record {str(data)[:250]}...  in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it is not a list of numbers. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.",
261                )
262            if len(field) != self.config.dimensions:
263                raise AirbyteTracedException(
264                    internal_message="Embedding vector field has wrong length",
265                    failure_type=FailureType.config_error,
266                    message=f"Record {str(data)[:250]}...  in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it has length {len(field)} instead of the configured {self.config.dimensions}. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.",
267                )
268            embeddings.append(field)
269
270        return embeddings
271
272    @property
273    def embedding_dimensions(self) -> int:
274        return self.config.dimensions

Embedder is an abstract class that defines the interface for embedding text.

The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.

234    def __init__(self, config: FromFieldEmbeddingConfigModel):
235        super().__init__()
236        self.config = config
config
def check(self) -> Optional[str]:
238    def check(self) -> Optional[str]:
239        return None
def embed_documents( self, documents: List[Document]) -> List[Optional[List[float]]]:
241    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
242        """
243        From each chunk, pull the embedding from the field specified in the config.
244        Check that the field exists, is a list of numbers and is the correct size. If not, raise an AirbyteTracedException explaining the problem.
245        """
246        embeddings: List[Optional[List[float]]] = []
247        for document in documents:
248            data = document.record.data
249            if self.config.field_name not in data:
250                raise AirbyteTracedException(
251                    internal_message="Embedding vector field not found",
252                    failure_type=FailureType.config_error,
253                    message=f"Record {str(data)[:250]}... in stream {document.record.stream}  does not contain embedding vector field {self.config.field_name}. Please check your embedding configuration, the embedding vector field has to be set correctly on every record.",
254                )
255            field = data[self.config.field_name]
256            if not isinstance(field, list) or not all(isinstance(x, (int, float)) for x in field):
257                raise AirbyteTracedException(
258                    internal_message="Embedding vector field not a list of numbers",
259                    failure_type=FailureType.config_error,
260                    message=f"Record {str(data)[:250]}...  in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it is not a list of numbers. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.",
261                )
262            if len(field) != self.config.dimensions:
263                raise AirbyteTracedException(
264                    internal_message="Embedding vector field has wrong length",
265                    failure_type=FailureType.config_error,
266                    message=f"Record {str(data)[:250]}...  in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it has length {len(field)} instead of the configured {self.config.dimensions}. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.",
267                )
268            embeddings.append(field)
269
270        return embeddings

From each chunk, pull the embedding from the field specified in the config. Check that the field exists, is a list of numbers and is the correct size. If not, raise an AirbyteTracedException explaining the problem.

embedding_dimensions: int
272    @property
273    def embedding_dimensions(self) -> int:
274        return self.config.dimensions
embedder_map = {'openai': <class 'OpenAIEmbedder'>, 'cohere': <class 'CohereEmbedder'>, 'fake': <class 'FakeEmbedder'>, 'azure_openai': <class 'AzureOpenAIEmbedder'>, 'from_field': <class 'FromFieldEmbedder'>, 'openai_compatible': <class 'OpenAICompatibleEmbedder'>}
287def create_from_config(
288    embedding_config: Union[
289        AzureOpenAIEmbeddingConfigModel,
290        CohereEmbeddingConfigModel,
291        FakeEmbeddingConfigModel,
292        FromFieldEmbeddingConfigModel,
293        OpenAIEmbeddingConfigModel,
294        OpenAICompatibleEmbeddingConfigModel,
295    ],
296    processing_config: ProcessingConfigModel,
297) -> Embedder:
298    if embedding_config.mode == "azure_openai" or embedding_config.mode == "openai":
299        return cast(
300            Embedder,
301            embedder_map[embedding_config.mode](embedding_config, processing_config.chunk_size),
302        )
303    else:
304        return cast(Embedder, embedder_map[embedding_config.mode](embedding_config))