airbyte_cdk.destinations.vector_db_based.embedder

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import os
  6from abc import ABC, abstractmethod
  7from dataclasses import dataclass
  8from typing import List, Optional, Union, cast
  9
 10from langchain_community.embeddings import (
 11    CohereEmbeddings,
 12    FakeEmbeddings,
 13    LocalAIEmbeddings,
 14    OpenAIEmbeddings,
 15)
 16
 17from airbyte_cdk.destinations.vector_db_based.config import (
 18    AzureOpenAIEmbeddingConfigModel,
 19    CohereEmbeddingConfigModel,
 20    FakeEmbeddingConfigModel,
 21    FromFieldEmbeddingConfigModel,
 22    OpenAICompatibleEmbeddingConfigModel,
 23    OpenAIEmbeddingConfigModel,
 24    ProcessingConfigModel,
 25)
 26from airbyte_cdk.destinations.vector_db_based.utils import create_chunks, format_exception
 27from airbyte_cdk.models import AirbyteRecordMessage
 28from airbyte_cdk.utils.traced_exception import AirbyteTracedException, FailureType
 29
 30
 31@dataclass
 32class Document:
 33    page_content: str
 34    record: AirbyteRecordMessage
 35
 36
 37class Embedder(ABC):
 38    """
 39    Embedder is an abstract class that defines the interface for embedding text.
 40
 41    The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination.
 42    The destination connector is responsible to create an embedder instance and pass it to the writer.
 43    The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
 44    """
 45
 46    def __init__(self) -> None:
 47        pass
 48
 49    @abstractmethod
 50    def check(self) -> Optional[str]:
 51        pass
 52
 53    @abstractmethod
 54    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
 55        """
 56        Embed the text of each chunk and return the resulting embedding vectors.
 57        If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.
 58        """
 59        pass
 60
 61    @property
 62    @abstractmethod
 63    def embedding_dimensions(self) -> int:
 64        pass
 65
 66
 67OPEN_AI_VECTOR_SIZE = 1536
 68
 69OPEN_AI_TOKEN_LIMIT = 150_000  # limit of tokens per minute
 70
 71
 72class BaseOpenAIEmbedder(Embedder):
 73    def __init__(self, embeddings: OpenAIEmbeddings, chunk_size: int):
 74        super().__init__()
 75        self.embeddings = embeddings
 76        self.chunk_size = chunk_size
 77
 78    def check(self) -> Optional[str]:
 79        try:
 80            self.embeddings.embed_query("test")
 81        except Exception as e:
 82            return format_exception(e)
 83        return None
 84
 85    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
 86        """
 87        Embed the text of each chunk and return the resulting embedding vectors.
 88
 89        As the OpenAI API will fail if more than the per-minute limit worth of tokens is sent at once, we split the request into batches and embed each batch separately.
 90        It's still possible to run into the rate limit between each embed call because the available token budget hasn't recovered between the calls,
 91        but the built-in retry mechanism of the OpenAI client handles that.
 92        """
 93        # Each chunk can hold at most self.chunk_size tokens, so tokens-per-minute by maximum tokens per chunk is the number of documents that can be embedded at once without exhausting the limit in a single request
 94        embedding_batch_size = OPEN_AI_TOKEN_LIMIT // self.chunk_size
 95        batches = create_chunks(documents, batch_size=embedding_batch_size)
 96        embeddings: List[Optional[List[float]]] = []
 97        for batch in batches:
 98            embeddings.extend(
 99                self.embeddings.embed_documents([chunk.page_content for chunk in batch])
100            )
101        return embeddings
102
103    @property
104    def embedding_dimensions(self) -> int:
105        # vector size produced by text-embedding-ada-002 model
106        return OPEN_AI_VECTOR_SIZE
107
108
109class OpenAIEmbedder(BaseOpenAIEmbedder):
110    def __init__(self, config: OpenAIEmbeddingConfigModel, chunk_size: int):
111        super().__init__(
112            OpenAIEmbeddings(  # type: ignore [call-arg]
113                openai_api_key=config.openai_key, max_retries=15, disallowed_special=()
114            ),
115            chunk_size,
116        )  # type: ignore
117
118
119class AzureOpenAIEmbedder(BaseOpenAIEmbedder):
120    def __init__(self, config: AzureOpenAIEmbeddingConfigModel, chunk_size: int):
121        # Azure OpenAI API has — as of 20230927 — a limit of 16 documents per request
122        super().__init__(
123            OpenAIEmbeddings(  # type: ignore [call-arg]
124                openai_api_key=config.openai_key,
125                chunk_size=16,
126                max_retries=15,
127                openai_api_type="azure",
128                openai_api_version="2023-05-15",
129                openai_api_base=config.api_base,
130                deployment=config.deployment,
131                disallowed_special=(),
132            ),
133            chunk_size,
134        )  # type: ignore
135
136
137COHERE_VECTOR_SIZE = 1024
138
139
140class CohereEmbedder(Embedder):
141    def __init__(self, config: CohereEmbeddingConfigModel):
142        super().__init__()
143        # Client is set internally
144        self.embeddings = CohereEmbeddings(
145            cohere_api_key=config.cohere_key,
146            model="embed-english-light-v2.0",
147            user_agent="airbyte-cdk",
148        )  # type: ignore
149
150    def check(self) -> Optional[str]:
151        try:
152            self.embeddings.embed_query("test")
153        except Exception as e:
154            return format_exception(e)
155        return None
156
157    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
158        return cast(
159            List[Optional[List[float]]],
160            self.embeddings.embed_documents([document.page_content for document in documents]),
161        )
162
163    @property
164    def embedding_dimensions(self) -> int:
165        # vector size produced by text-embedding-ada-002 model
166        return COHERE_VECTOR_SIZE
167
168
169class FakeEmbedder(Embedder):
170    def __init__(self, config: FakeEmbeddingConfigModel):
171        super().__init__()
172        self.embeddings = FakeEmbeddings(size=OPEN_AI_VECTOR_SIZE)
173
174    def check(self) -> Optional[str]:
175        try:
176            self.embeddings.embed_query("test")
177        except Exception as e:
178            return format_exception(e)
179        return None
180
181    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
182        return cast(
183            List[Optional[List[float]]],
184            self.embeddings.embed_documents([document.page_content for document in documents]),
185        )
186
187    @property
188    def embedding_dimensions(self) -> int:
189        # use same vector size as for OpenAI embeddings to keep it realistic
190        return OPEN_AI_VECTOR_SIZE
191
192
193CLOUD_DEPLOYMENT_MODE = "cloud"
194
195
196class OpenAICompatibleEmbedder(Embedder):
197    def __init__(self, config: OpenAICompatibleEmbeddingConfigModel):
198        super().__init__()
199        self.config = config
200        # Client is set internally
201        # Always set an API key even if there is none defined in the config because the validator will fail otherwise. Embedding APIs that don't require an API key don't fail if one is provided, so this is not breaking usage.
202        self.embeddings = LocalAIEmbeddings(
203            model=config.model_name,
204            openai_api_key=config.api_key or "dummy-api-key",
205            openai_api_base=config.base_url,
206            max_retries=15,
207            disallowed_special=(),
208        )  # type: ignore
209
210    def check(self) -> Optional[str]:
211        deployment_mode = os.environ.get("DEPLOYMENT_MODE", "")
212        if (
213            deployment_mode.casefold() == CLOUD_DEPLOYMENT_MODE
214            and not self.config.base_url.startswith("https://")
215        ):
216            return "Base URL must start with https://"
217
218        try:
219            self.embeddings.embed_query("test")
220        except Exception as e:
221            return format_exception(e)
222        return None
223
224    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
225        return cast(
226            List[Optional[List[float]]],
227            self.embeddings.embed_documents([document.page_content for document in documents]),
228        )
229
230    @property
231    def embedding_dimensions(self) -> int:
232        # vector size produced by the model
233        return self.config.dimensions
234
235
236class FromFieldEmbedder(Embedder):
237    def __init__(self, config: FromFieldEmbeddingConfigModel):
238        super().__init__()
239        self.config = config
240
241    def check(self) -> Optional[str]:
242        return None
243
244    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
245        """
246        From each chunk, pull the embedding from the field specified in the config.
247        Check that the field exists, is a list of numbers and is the correct size. If not, raise an AirbyteTracedException explaining the problem.
248        """
249        embeddings: List[Optional[List[float]]] = []
250        for document in documents:
251            data = document.record.data
252            if self.config.field_name not in data:
253                raise AirbyteTracedException(
254                    internal_message="Embedding vector field not found",
255                    failure_type=FailureType.config_error,
256                    message=f"Record {str(data)[:250]}... in stream {document.record.stream}  does not contain embedding vector field {self.config.field_name}. Please check your embedding configuration, the embedding vector field has to be set correctly on every record.",
257                )
258            field = data[self.config.field_name]
259            if not isinstance(field, list) or not all(isinstance(x, (int, float)) for x in field):
260                raise AirbyteTracedException(
261                    internal_message="Embedding vector field not a list of numbers",
262                    failure_type=FailureType.config_error,
263                    message=f"Record {str(data)[:250]}...  in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it is not a list of numbers. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.",
264                )
265            if len(field) != self.config.dimensions:
266                raise AirbyteTracedException(
267                    internal_message="Embedding vector field has wrong length",
268                    failure_type=FailureType.config_error,
269                    message=f"Record {str(data)[:250]}...  in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it has length {len(field)} instead of the configured {self.config.dimensions}. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.",
270                )
271            embeddings.append(field)
272
273        return embeddings
274
275    @property
276    def embedding_dimensions(self) -> int:
277        return self.config.dimensions
278
279
280embedder_map = {
281    "openai": OpenAIEmbedder,
282    "cohere": CohereEmbedder,
283    "fake": FakeEmbedder,
284    "azure_openai": AzureOpenAIEmbedder,
285    "from_field": FromFieldEmbedder,
286    "openai_compatible": OpenAICompatibleEmbedder,
287}
288
289
290def create_from_config(
291    embedding_config: Union[
292        AzureOpenAIEmbeddingConfigModel,
293        CohereEmbeddingConfigModel,
294        FakeEmbeddingConfigModel,
295        FromFieldEmbeddingConfigModel,
296        OpenAIEmbeddingConfigModel,
297        OpenAICompatibleEmbeddingConfigModel,
298    ],
299    processing_config: ProcessingConfigModel,
300) -> Embedder:
301    if embedding_config.mode == "azure_openai" or embedding_config.mode == "openai":
302        return cast(
303            Embedder,
304            embedder_map[embedding_config.mode](embedding_config, processing_config.chunk_size),
305        )
306    else:
307        return cast(Embedder, embedder_map[embedding_config.mode](embedding_config))
@dataclass
class Document:
32@dataclass
33class Document:
34    page_content: str
35    record: AirbyteRecordMessage
Document( page_content: str, record: airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteRecordMessage)
page_content: str
record: airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteRecordMessage
class Embedder(abc.ABC):
38class Embedder(ABC):
39    """
40    Embedder is an abstract class that defines the interface for embedding text.
41
42    The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination.
43    The destination connector is responsible to create an embedder instance and pass it to the writer.
44    The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
45    """
46
47    def __init__(self) -> None:
48        pass
49
50    @abstractmethod
51    def check(self) -> Optional[str]:
52        pass
53
54    @abstractmethod
55    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
56        """
57        Embed the text of each chunk and return the resulting embedding vectors.
58        If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.
59        """
60        pass
61
62    @property
63    @abstractmethod
64    def embedding_dimensions(self) -> int:
65        pass

Embedder is an abstract class that defines the interface for embedding text.

The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.

@abstractmethod
def check(self) -> Optional[str]:
50    @abstractmethod
51    def check(self) -> Optional[str]:
52        pass
@abstractmethod
def embed_documents( self, documents: List[Document]) -> List[Optional[List[float]]]:
54    @abstractmethod
55    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
56        """
57        Embed the text of each chunk and return the resulting embedding vectors.
58        If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.
59        """
60        pass

Embed the text of each chunk and return the resulting embedding vectors. If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.

embedding_dimensions: int
62    @property
63    @abstractmethod
64    def embedding_dimensions(self) -> int:
65        pass
OPEN_AI_VECTOR_SIZE = 1536
OPEN_AI_TOKEN_LIMIT = 150000
class BaseOpenAIEmbedder(Embedder):
 73class BaseOpenAIEmbedder(Embedder):
 74    def __init__(self, embeddings: OpenAIEmbeddings, chunk_size: int):
 75        super().__init__()
 76        self.embeddings = embeddings
 77        self.chunk_size = chunk_size
 78
 79    def check(self) -> Optional[str]:
 80        try:
 81            self.embeddings.embed_query("test")
 82        except Exception as e:
 83            return format_exception(e)
 84        return None
 85
 86    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
 87        """
 88        Embed the text of each chunk and return the resulting embedding vectors.
 89
 90        As the OpenAI API will fail if more than the per-minute limit worth of tokens is sent at once, we split the request into batches and embed each batch separately.
 91        It's still possible to run into the rate limit between each embed call because the available token budget hasn't recovered between the calls,
 92        but the built-in retry mechanism of the OpenAI client handles that.
 93        """
 94        # Each chunk can hold at most self.chunk_size tokens, so tokens-per-minute by maximum tokens per chunk is the number of documents that can be embedded at once without exhausting the limit in a single request
 95        embedding_batch_size = OPEN_AI_TOKEN_LIMIT // self.chunk_size
 96        batches = create_chunks(documents, batch_size=embedding_batch_size)
 97        embeddings: List[Optional[List[float]]] = []
 98        for batch in batches:
 99            embeddings.extend(
100                self.embeddings.embed_documents([chunk.page_content for chunk in batch])
101            )
102        return embeddings
103
104    @property
105    def embedding_dimensions(self) -> int:
106        # vector size produced by text-embedding-ada-002 model
107        return OPEN_AI_VECTOR_SIZE

Embedder is an abstract class that defines the interface for embedding text.

The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.

BaseOpenAIEmbedder( embeddings: langchain_community.embeddings.openai.OpenAIEmbeddings, chunk_size: int)
74    def __init__(self, embeddings: OpenAIEmbeddings, chunk_size: int):
75        super().__init__()
76        self.embeddings = embeddings
77        self.chunk_size = chunk_size
embeddings
chunk_size
def check(self) -> Optional[str]:
79    def check(self) -> Optional[str]:
80        try:
81            self.embeddings.embed_query("test")
82        except Exception as e:
83            return format_exception(e)
84        return None
def embed_documents( self, documents: List[Document]) -> List[Optional[List[float]]]:
 86    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
 87        """
 88        Embed the text of each chunk and return the resulting embedding vectors.
 89
 90        As the OpenAI API will fail if more than the per-minute limit worth of tokens is sent at once, we split the request into batches and embed each batch separately.
 91        It's still possible to run into the rate limit between each embed call because the available token budget hasn't recovered between the calls,
 92        but the built-in retry mechanism of the OpenAI client handles that.
 93        """
 94        # Each chunk can hold at most self.chunk_size tokens, so tokens-per-minute by maximum tokens per chunk is the number of documents that can be embedded at once without exhausting the limit in a single request
 95        embedding_batch_size = OPEN_AI_TOKEN_LIMIT // self.chunk_size
 96        batches = create_chunks(documents, batch_size=embedding_batch_size)
 97        embeddings: List[Optional[List[float]]] = []
 98        for batch in batches:
 99            embeddings.extend(
100                self.embeddings.embed_documents([chunk.page_content for chunk in batch])
101            )
102        return embeddings

Embed the text of each chunk and return the resulting embedding vectors.

As the OpenAI API will fail if more than the per-minute limit worth of tokens is sent at once, we split the request into batches and embed each batch separately. It's still possible to run into the rate limit between each embed call because the available token budget hasn't recovered between the calls, but the built-in retry mechanism of the OpenAI client handles that.

embedding_dimensions: int
104    @property
105    def embedding_dimensions(self) -> int:
106        # vector size produced by text-embedding-ada-002 model
107        return OPEN_AI_VECTOR_SIZE
class OpenAIEmbedder(BaseOpenAIEmbedder):
110class OpenAIEmbedder(BaseOpenAIEmbedder):
111    def __init__(self, config: OpenAIEmbeddingConfigModel, chunk_size: int):
112        super().__init__(
113            OpenAIEmbeddings(  # type: ignore [call-arg]
114                openai_api_key=config.openai_key, max_retries=15, disallowed_special=()
115            ),
116            chunk_size,
117        )  # type: ignore

Embedder is an abstract class that defines the interface for embedding text.

The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.

OpenAIEmbedder( config: airbyte_cdk.destinations.vector_db_based.OpenAIEmbeddingConfigModel, chunk_size: int)
111    def __init__(self, config: OpenAIEmbeddingConfigModel, chunk_size: int):
112        super().__init__(
113            OpenAIEmbeddings(  # type: ignore [call-arg]
114                openai_api_key=config.openai_key, max_retries=15, disallowed_special=()
115            ),
116            chunk_size,
117        )  # type: ignore
class AzureOpenAIEmbedder(BaseOpenAIEmbedder):
120class AzureOpenAIEmbedder(BaseOpenAIEmbedder):
121    def __init__(self, config: AzureOpenAIEmbeddingConfigModel, chunk_size: int):
122        # Azure OpenAI API has — as of 20230927 — a limit of 16 documents per request
123        super().__init__(
124            OpenAIEmbeddings(  # type: ignore [call-arg]
125                openai_api_key=config.openai_key,
126                chunk_size=16,
127                max_retries=15,
128                openai_api_type="azure",
129                openai_api_version="2023-05-15",
130                openai_api_base=config.api_base,
131                deployment=config.deployment,
132                disallowed_special=(),
133            ),
134            chunk_size,
135        )  # type: ignore

Embedder is an abstract class that defines the interface for embedding text.

The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.

AzureOpenAIEmbedder( config: airbyte_cdk.destinations.vector_db_based.AzureOpenAIEmbeddingConfigModel, chunk_size: int)
121    def __init__(self, config: AzureOpenAIEmbeddingConfigModel, chunk_size: int):
122        # Azure OpenAI API has — as of 20230927 — a limit of 16 documents per request
123        super().__init__(
124            OpenAIEmbeddings(  # type: ignore [call-arg]
125                openai_api_key=config.openai_key,
126                chunk_size=16,
127                max_retries=15,
128                openai_api_type="azure",
129                openai_api_version="2023-05-15",
130                openai_api_base=config.api_base,
131                deployment=config.deployment,
132                disallowed_special=(),
133            ),
134            chunk_size,
135        )  # type: ignore
COHERE_VECTOR_SIZE = 1024
class CohereEmbedder(Embedder):
141class CohereEmbedder(Embedder):
142    def __init__(self, config: CohereEmbeddingConfigModel):
143        super().__init__()
144        # Client is set internally
145        self.embeddings = CohereEmbeddings(
146            cohere_api_key=config.cohere_key,
147            model="embed-english-light-v2.0",
148            user_agent="airbyte-cdk",
149        )  # type: ignore
150
151    def check(self) -> Optional[str]:
152        try:
153            self.embeddings.embed_query("test")
154        except Exception as e:
155            return format_exception(e)
156        return None
157
158    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
159        return cast(
160            List[Optional[List[float]]],
161            self.embeddings.embed_documents([document.page_content for document in documents]),
162        )
163
164    @property
165    def embedding_dimensions(self) -> int:
166        # vector size produced by text-embedding-ada-002 model
167        return COHERE_VECTOR_SIZE

Embedder is an abstract class that defines the interface for embedding text.

The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.

142    def __init__(self, config: CohereEmbeddingConfigModel):
143        super().__init__()
144        # Client is set internally
145        self.embeddings = CohereEmbeddings(
146            cohere_api_key=config.cohere_key,
147            model="embed-english-light-v2.0",
148            user_agent="airbyte-cdk",
149        )  # type: ignore
embeddings
def check(self) -> Optional[str]:
151    def check(self) -> Optional[str]:
152        try:
153            self.embeddings.embed_query("test")
154        except Exception as e:
155            return format_exception(e)
156        return None
def embed_documents( self, documents: List[Document]) -> List[Optional[List[float]]]:
158    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
159        return cast(
160            List[Optional[List[float]]],
161            self.embeddings.embed_documents([document.page_content for document in documents]),
162        )

Embed the text of each chunk and return the resulting embedding vectors. If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.

embedding_dimensions: int
164    @property
165    def embedding_dimensions(self) -> int:
166        # vector size produced by text-embedding-ada-002 model
167        return COHERE_VECTOR_SIZE
class FakeEmbedder(Embedder):
170class FakeEmbedder(Embedder):
171    def __init__(self, config: FakeEmbeddingConfigModel):
172        super().__init__()
173        self.embeddings = FakeEmbeddings(size=OPEN_AI_VECTOR_SIZE)
174
175    def check(self) -> Optional[str]:
176        try:
177            self.embeddings.embed_query("test")
178        except Exception as e:
179            return format_exception(e)
180        return None
181
182    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
183        return cast(
184            List[Optional[List[float]]],
185            self.embeddings.embed_documents([document.page_content for document in documents]),
186        )
187
188    @property
189    def embedding_dimensions(self) -> int:
190        # use same vector size as for OpenAI embeddings to keep it realistic
191        return OPEN_AI_VECTOR_SIZE

Embedder is an abstract class that defines the interface for embedding text.

The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.

171    def __init__(self, config: FakeEmbeddingConfigModel):
172        super().__init__()
173        self.embeddings = FakeEmbeddings(size=OPEN_AI_VECTOR_SIZE)
embeddings
def check(self) -> Optional[str]:
175    def check(self) -> Optional[str]:
176        try:
177            self.embeddings.embed_query("test")
178        except Exception as e:
179            return format_exception(e)
180        return None
def embed_documents( self, documents: List[Document]) -> List[Optional[List[float]]]:
182    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
183        return cast(
184            List[Optional[List[float]]],
185            self.embeddings.embed_documents([document.page_content for document in documents]),
186        )

Embed the text of each chunk and return the resulting embedding vectors. If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.

embedding_dimensions: int
188    @property
189    def embedding_dimensions(self) -> int:
190        # use same vector size as for OpenAI embeddings to keep it realistic
191        return OPEN_AI_VECTOR_SIZE
CLOUD_DEPLOYMENT_MODE = 'cloud'
class OpenAICompatibleEmbedder(Embedder):
197class OpenAICompatibleEmbedder(Embedder):
198    def __init__(self, config: OpenAICompatibleEmbeddingConfigModel):
199        super().__init__()
200        self.config = config
201        # Client is set internally
202        # Always set an API key even if there is none defined in the config because the validator will fail otherwise. Embedding APIs that don't require an API key don't fail if one is provided, so this is not breaking usage.
203        self.embeddings = LocalAIEmbeddings(
204            model=config.model_name,
205            openai_api_key=config.api_key or "dummy-api-key",
206            openai_api_base=config.base_url,
207            max_retries=15,
208            disallowed_special=(),
209        )  # type: ignore
210
211    def check(self) -> Optional[str]:
212        deployment_mode = os.environ.get("DEPLOYMENT_MODE", "")
213        if (
214            deployment_mode.casefold() == CLOUD_DEPLOYMENT_MODE
215            and not self.config.base_url.startswith("https://")
216        ):
217            return "Base URL must start with https://"
218
219        try:
220            self.embeddings.embed_query("test")
221        except Exception as e:
222            return format_exception(e)
223        return None
224
225    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
226        return cast(
227            List[Optional[List[float]]],
228            self.embeddings.embed_documents([document.page_content for document in documents]),
229        )
230
231    @property
232    def embedding_dimensions(self) -> int:
233        # vector size produced by the model
234        return self.config.dimensions

Embedder is an abstract class that defines the interface for embedding text.

The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.

198    def __init__(self, config: OpenAICompatibleEmbeddingConfigModel):
199        super().__init__()
200        self.config = config
201        # Client is set internally
202        # Always set an API key even if there is none defined in the config because the validator will fail otherwise. Embedding APIs that don't require an API key don't fail if one is provided, so this is not breaking usage.
203        self.embeddings = LocalAIEmbeddings(
204            model=config.model_name,
205            openai_api_key=config.api_key or "dummy-api-key",
206            openai_api_base=config.base_url,
207            max_retries=15,
208            disallowed_special=(),
209        )  # type: ignore
config
embeddings
def check(self) -> Optional[str]:
211    def check(self) -> Optional[str]:
212        deployment_mode = os.environ.get("DEPLOYMENT_MODE", "")
213        if (
214            deployment_mode.casefold() == CLOUD_DEPLOYMENT_MODE
215            and not self.config.base_url.startswith("https://")
216        ):
217            return "Base URL must start with https://"
218
219        try:
220            self.embeddings.embed_query("test")
221        except Exception as e:
222            return format_exception(e)
223        return None
def embed_documents( self, documents: List[Document]) -> List[Optional[List[float]]]:
225    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
226        return cast(
227            List[Optional[List[float]]],
228            self.embeddings.embed_documents([document.page_content for document in documents]),
229        )

Embed the text of each chunk and return the resulting embedding vectors. If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.

embedding_dimensions: int
231    @property
232    def embedding_dimensions(self) -> int:
233        # vector size produced by the model
234        return self.config.dimensions
class FromFieldEmbedder(Embedder):
237class FromFieldEmbedder(Embedder):
238    def __init__(self, config: FromFieldEmbeddingConfigModel):
239        super().__init__()
240        self.config = config
241
242    def check(self) -> Optional[str]:
243        return None
244
245    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
246        """
247        From each chunk, pull the embedding from the field specified in the config.
248        Check that the field exists, is a list of numbers and is the correct size. If not, raise an AirbyteTracedException explaining the problem.
249        """
250        embeddings: List[Optional[List[float]]] = []
251        for document in documents:
252            data = document.record.data
253            if self.config.field_name not in data:
254                raise AirbyteTracedException(
255                    internal_message="Embedding vector field not found",
256                    failure_type=FailureType.config_error,
257                    message=f"Record {str(data)[:250]}... in stream {document.record.stream}  does not contain embedding vector field {self.config.field_name}. Please check your embedding configuration, the embedding vector field has to be set correctly on every record.",
258                )
259            field = data[self.config.field_name]
260            if not isinstance(field, list) or not all(isinstance(x, (int, float)) for x in field):
261                raise AirbyteTracedException(
262                    internal_message="Embedding vector field not a list of numbers",
263                    failure_type=FailureType.config_error,
264                    message=f"Record {str(data)[:250]}...  in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it is not a list of numbers. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.",
265                )
266            if len(field) != self.config.dimensions:
267                raise AirbyteTracedException(
268                    internal_message="Embedding vector field has wrong length",
269                    failure_type=FailureType.config_error,
270                    message=f"Record {str(data)[:250]}...  in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it has length {len(field)} instead of the configured {self.config.dimensions}. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.",
271                )
272            embeddings.append(field)
273
274        return embeddings
275
276    @property
277    def embedding_dimensions(self) -> int:
278        return self.config.dimensions

Embedder is an abstract class that defines the interface for embedding text.

The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.

238    def __init__(self, config: FromFieldEmbeddingConfigModel):
239        super().__init__()
240        self.config = config
config
def check(self) -> Optional[str]:
242    def check(self) -> Optional[str]:
243        return None
def embed_documents( self, documents: List[Document]) -> List[Optional[List[float]]]:
245    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
246        """
247        From each chunk, pull the embedding from the field specified in the config.
248        Check that the field exists, is a list of numbers and is the correct size. If not, raise an AirbyteTracedException explaining the problem.
249        """
250        embeddings: List[Optional[List[float]]] = []
251        for document in documents:
252            data = document.record.data
253            if self.config.field_name not in data:
254                raise AirbyteTracedException(
255                    internal_message="Embedding vector field not found",
256                    failure_type=FailureType.config_error,
257                    message=f"Record {str(data)[:250]}... in stream {document.record.stream}  does not contain embedding vector field {self.config.field_name}. Please check your embedding configuration, the embedding vector field has to be set correctly on every record.",
258                )
259            field = data[self.config.field_name]
260            if not isinstance(field, list) or not all(isinstance(x, (int, float)) for x in field):
261                raise AirbyteTracedException(
262                    internal_message="Embedding vector field not a list of numbers",
263                    failure_type=FailureType.config_error,
264                    message=f"Record {str(data)[:250]}...  in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it is not a list of numbers. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.",
265                )
266            if len(field) != self.config.dimensions:
267                raise AirbyteTracedException(
268                    internal_message="Embedding vector field has wrong length",
269                    failure_type=FailureType.config_error,
270                    message=f"Record {str(data)[:250]}...  in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it has length {len(field)} instead of the configured {self.config.dimensions}. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.",
271                )
272            embeddings.append(field)
273
274        return embeddings

From each chunk, pull the embedding from the field specified in the config. Check that the field exists, is a list of numbers and is the correct size. If not, raise an AirbyteTracedException explaining the problem.

embedding_dimensions: int
276    @property
277    def embedding_dimensions(self) -> int:
278        return self.config.dimensions
embedder_map = {'openai': <class 'OpenAIEmbedder'>, 'cohere': <class 'CohereEmbedder'>, 'fake': <class 'FakeEmbedder'>, 'azure_openai': <class 'AzureOpenAIEmbedder'>, 'from_field': <class 'FromFieldEmbedder'>, 'openai_compatible': <class 'OpenAICompatibleEmbedder'>}
291def create_from_config(
292    embedding_config: Union[
293        AzureOpenAIEmbeddingConfigModel,
294        CohereEmbeddingConfigModel,
295        FakeEmbeddingConfigModel,
296        FromFieldEmbeddingConfigModel,
297        OpenAIEmbeddingConfigModel,
298        OpenAICompatibleEmbeddingConfigModel,
299    ],
300    processing_config: ProcessingConfigModel,
301) -> Embedder:
302    if embedding_config.mode == "azure_openai" or embedding_config.mode == "openai":
303        return cast(
304            Embedder,
305            embedder_map[embedding_config.mode](embedding_config, processing_config.chunk_size),
306        )
307    else:
308        return cast(Embedder, embedder_map[embedding_config.mode](embedding_config))