airbyte_cdk.destinations.vector_db_based.embedder
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import os 6from abc import ABC, abstractmethod 7from dataclasses import dataclass 8from typing import List, Optional, Union, cast 9 10from langchain_community.embeddings import ( 11 CohereEmbeddings, 12 FakeEmbeddings, 13 LocalAIEmbeddings, 14 OpenAIEmbeddings, 15) 16 17from airbyte_cdk.destinations.vector_db_based.config import ( 18 AzureOpenAIEmbeddingConfigModel, 19 CohereEmbeddingConfigModel, 20 FakeEmbeddingConfigModel, 21 FromFieldEmbeddingConfigModel, 22 OpenAICompatibleEmbeddingConfigModel, 23 OpenAIEmbeddingConfigModel, 24 ProcessingConfigModel, 25) 26from airbyte_cdk.destinations.vector_db_based.utils import create_chunks, format_exception 27from airbyte_cdk.models import AirbyteRecordMessage 28from airbyte_cdk.utils.traced_exception import AirbyteTracedException, FailureType 29 30 31@dataclass 32class Document: 33 page_content: str 34 record: AirbyteRecordMessage 35 36 37class Embedder(ABC): 38 """ 39 Embedder is an abstract class that defines the interface for embedding text. 40 41 The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. 42 The destination connector is responsible to create an embedder instance and pass it to the writer. 43 The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed. 44 """ 45 46 def __init__(self) -> None: 47 pass 48 49 @abstractmethod 50 def check(self) -> Optional[str]: 51 pass 52 53 @abstractmethod 54 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 55 """ 56 Embed the text of each chunk and return the resulting embedding vectors. 57 If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk. 58 """ 59 pass 60 61 @property 62 @abstractmethod 63 def embedding_dimensions(self) -> int: 64 pass 65 66 67OPEN_AI_VECTOR_SIZE = 1536 68 69OPEN_AI_TOKEN_LIMIT = 150_000 # limit of tokens per minute 70 71 72class BaseOpenAIEmbedder(Embedder): 73 def __init__(self, embeddings: OpenAIEmbeddings, chunk_size: int): 74 super().__init__() 75 self.embeddings = embeddings 76 self.chunk_size = chunk_size 77 78 def check(self) -> Optional[str]: 79 try: 80 self.embeddings.embed_query("test") 81 except Exception as e: 82 return format_exception(e) 83 return None 84 85 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 86 """ 87 Embed the text of each chunk and return the resulting embedding vectors. 88 89 As the OpenAI API will fail if more than the per-minute limit worth of tokens is sent at once, we split the request into batches and embed each batch separately. 90 It's still possible to run into the rate limit between each embed call because the available token budget hasn't recovered between the calls, 91 but the built-in retry mechanism of the OpenAI client handles that. 92 """ 93 # Each chunk can hold at most self.chunk_size tokens, so tokens-per-minute by maximum tokens per chunk is the number of documents that can be embedded at once without exhausting the limit in a single request 94 embedding_batch_size = OPEN_AI_TOKEN_LIMIT // self.chunk_size 95 batches = create_chunks(documents, batch_size=embedding_batch_size) 96 embeddings: List[Optional[List[float]]] = [] 97 for batch in batches: 98 embeddings.extend( 99 self.embeddings.embed_documents([chunk.page_content for chunk in batch]) 100 ) 101 return embeddings 102 103 @property 104 def embedding_dimensions(self) -> int: 105 # vector size produced by text-embedding-ada-002 model 106 return OPEN_AI_VECTOR_SIZE 107 108 109class OpenAIEmbedder(BaseOpenAIEmbedder): 110 def __init__(self, config: OpenAIEmbeddingConfigModel, chunk_size: int): 111 super().__init__( 112 OpenAIEmbeddings( # type: ignore [call-arg] 113 openai_api_key=config.openai_key, max_retries=15, disallowed_special=() 114 ), 115 chunk_size, 116 ) # type: ignore 117 118 119class AzureOpenAIEmbedder(BaseOpenAIEmbedder): 120 def __init__(self, config: AzureOpenAIEmbeddingConfigModel, chunk_size: int): 121 # Azure OpenAI API has — as of 20230927 — a limit of 16 documents per request 122 super().__init__( 123 OpenAIEmbeddings( # type: ignore [call-arg] 124 openai_api_key=config.openai_key, 125 chunk_size=16, 126 max_retries=15, 127 openai_api_type="azure", 128 openai_api_version="2023-05-15", 129 openai_api_base=config.api_base, 130 deployment=config.deployment, 131 disallowed_special=(), 132 ), 133 chunk_size, 134 ) # type: ignore 135 136 137COHERE_VECTOR_SIZE = 1024 138 139 140class CohereEmbedder(Embedder): 141 def __init__(self, config: CohereEmbeddingConfigModel): 142 super().__init__() 143 # Client is set internally 144 self.embeddings = CohereEmbeddings( 145 cohere_api_key=config.cohere_key, 146 model="embed-english-light-v2.0", 147 user_agent="airbyte-cdk", 148 ) # type: ignore 149 150 def check(self) -> Optional[str]: 151 try: 152 self.embeddings.embed_query("test") 153 except Exception as e: 154 return format_exception(e) 155 return None 156 157 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 158 return cast( 159 List[Optional[List[float]]], 160 self.embeddings.embed_documents([document.page_content for document in documents]), 161 ) 162 163 @property 164 def embedding_dimensions(self) -> int: 165 # vector size produced by text-embedding-ada-002 model 166 return COHERE_VECTOR_SIZE 167 168 169class FakeEmbedder(Embedder): 170 def __init__(self, config: FakeEmbeddingConfigModel): 171 super().__init__() 172 self.embeddings = FakeEmbeddings(size=OPEN_AI_VECTOR_SIZE) 173 174 def check(self) -> Optional[str]: 175 try: 176 self.embeddings.embed_query("test") 177 except Exception as e: 178 return format_exception(e) 179 return None 180 181 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 182 return cast( 183 List[Optional[List[float]]], 184 self.embeddings.embed_documents([document.page_content for document in documents]), 185 ) 186 187 @property 188 def embedding_dimensions(self) -> int: 189 # use same vector size as for OpenAI embeddings to keep it realistic 190 return OPEN_AI_VECTOR_SIZE 191 192 193CLOUD_DEPLOYMENT_MODE = "cloud" 194 195 196class OpenAICompatibleEmbedder(Embedder): 197 def __init__(self, config: OpenAICompatibleEmbeddingConfigModel): 198 super().__init__() 199 self.config = config 200 # Client is set internally 201 # Always set an API key even if there is none defined in the config because the validator will fail otherwise. Embedding APIs that don't require an API key don't fail if one is provided, so this is not breaking usage. 202 self.embeddings = LocalAIEmbeddings( 203 model=config.model_name, 204 openai_api_key=config.api_key or "dummy-api-key", 205 openai_api_base=config.base_url, 206 max_retries=15, 207 disallowed_special=(), 208 ) # type: ignore 209 210 def check(self) -> Optional[str]: 211 deployment_mode = os.environ.get("DEPLOYMENT_MODE", "") 212 if ( 213 deployment_mode.casefold() == CLOUD_DEPLOYMENT_MODE 214 and not self.config.base_url.startswith("https://") 215 ): 216 return "Base URL must start with https://" 217 218 try: 219 self.embeddings.embed_query("test") 220 except Exception as e: 221 return format_exception(e) 222 return None 223 224 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 225 return cast( 226 List[Optional[List[float]]], 227 self.embeddings.embed_documents([document.page_content for document in documents]), 228 ) 229 230 @property 231 def embedding_dimensions(self) -> int: 232 # vector size produced by the model 233 return self.config.dimensions 234 235 236class FromFieldEmbedder(Embedder): 237 def __init__(self, config: FromFieldEmbeddingConfigModel): 238 super().__init__() 239 self.config = config 240 241 def check(self) -> Optional[str]: 242 return None 243 244 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 245 """ 246 From each chunk, pull the embedding from the field specified in the config. 247 Check that the field exists, is a list of numbers and is the correct size. If not, raise an AirbyteTracedException explaining the problem. 248 """ 249 embeddings: List[Optional[List[float]]] = [] 250 for document in documents: 251 data = document.record.data 252 if self.config.field_name not in data: 253 raise AirbyteTracedException( 254 internal_message="Embedding vector field not found", 255 failure_type=FailureType.config_error, 256 message=f"Record {str(data)[:250]}... in stream {document.record.stream} does not contain embedding vector field {self.config.field_name}. Please check your embedding configuration, the embedding vector field has to be set correctly on every record.", 257 ) 258 field = data[self.config.field_name] 259 if not isinstance(field, list) or not all(isinstance(x, (int, float)) for x in field): 260 raise AirbyteTracedException( 261 internal_message="Embedding vector field not a list of numbers", 262 failure_type=FailureType.config_error, 263 message=f"Record {str(data)[:250]}... in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it is not a list of numbers. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.", 264 ) 265 if len(field) != self.config.dimensions: 266 raise AirbyteTracedException( 267 internal_message="Embedding vector field has wrong length", 268 failure_type=FailureType.config_error, 269 message=f"Record {str(data)[:250]}... in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it has length {len(field)} instead of the configured {self.config.dimensions}. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.", 270 ) 271 embeddings.append(field) 272 273 return embeddings 274 275 @property 276 def embedding_dimensions(self) -> int: 277 return self.config.dimensions 278 279 280embedder_map = { 281 "openai": OpenAIEmbedder, 282 "cohere": CohereEmbedder, 283 "fake": FakeEmbedder, 284 "azure_openai": AzureOpenAIEmbedder, 285 "from_field": FromFieldEmbedder, 286 "openai_compatible": OpenAICompatibleEmbedder, 287} 288 289 290def create_from_config( 291 embedding_config: Union[ 292 AzureOpenAIEmbeddingConfigModel, 293 CohereEmbeddingConfigModel, 294 FakeEmbeddingConfigModel, 295 FromFieldEmbeddingConfigModel, 296 OpenAIEmbeddingConfigModel, 297 OpenAICompatibleEmbeddingConfigModel, 298 ], 299 processing_config: ProcessingConfigModel, 300) -> Embedder: 301 if embedding_config.mode == "azure_openai" or embedding_config.mode == "openai": 302 return cast( 303 Embedder, 304 embedder_map[embedding_config.mode](embedding_config, processing_config.chunk_size), 305 ) 306 else: 307 return cast(Embedder, embedder_map[embedding_config.mode](embedding_config))
38class Embedder(ABC): 39 """ 40 Embedder is an abstract class that defines the interface for embedding text. 41 42 The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. 43 The destination connector is responsible to create an embedder instance and pass it to the writer. 44 The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed. 45 """ 46 47 def __init__(self) -> None: 48 pass 49 50 @abstractmethod 51 def check(self) -> Optional[str]: 52 pass 53 54 @abstractmethod 55 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 56 """ 57 Embed the text of each chunk and return the resulting embedding vectors. 58 If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk. 59 """ 60 pass 61 62 @property 63 @abstractmethod 64 def embedding_dimensions(self) -> int: 65 pass
Embedder is an abstract class that defines the interface for embedding text.
The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
54 @abstractmethod 55 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 56 """ 57 Embed the text of each chunk and return the resulting embedding vectors. 58 If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk. 59 """ 60 pass
Embed the text of each chunk and return the resulting embedding vectors. If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.
73class BaseOpenAIEmbedder(Embedder): 74 def __init__(self, embeddings: OpenAIEmbeddings, chunk_size: int): 75 super().__init__() 76 self.embeddings = embeddings 77 self.chunk_size = chunk_size 78 79 def check(self) -> Optional[str]: 80 try: 81 self.embeddings.embed_query("test") 82 except Exception as e: 83 return format_exception(e) 84 return None 85 86 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 87 """ 88 Embed the text of each chunk and return the resulting embedding vectors. 89 90 As the OpenAI API will fail if more than the per-minute limit worth of tokens is sent at once, we split the request into batches and embed each batch separately. 91 It's still possible to run into the rate limit between each embed call because the available token budget hasn't recovered between the calls, 92 but the built-in retry mechanism of the OpenAI client handles that. 93 """ 94 # Each chunk can hold at most self.chunk_size tokens, so tokens-per-minute by maximum tokens per chunk is the number of documents that can be embedded at once without exhausting the limit in a single request 95 embedding_batch_size = OPEN_AI_TOKEN_LIMIT // self.chunk_size 96 batches = create_chunks(documents, batch_size=embedding_batch_size) 97 embeddings: List[Optional[List[float]]] = [] 98 for batch in batches: 99 embeddings.extend( 100 self.embeddings.embed_documents([chunk.page_content for chunk in batch]) 101 ) 102 return embeddings 103 104 @property 105 def embedding_dimensions(self) -> int: 106 # vector size produced by text-embedding-ada-002 model 107 return OPEN_AI_VECTOR_SIZE
Embedder is an abstract class that defines the interface for embedding text.
The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
86 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 87 """ 88 Embed the text of each chunk and return the resulting embedding vectors. 89 90 As the OpenAI API will fail if more than the per-minute limit worth of tokens is sent at once, we split the request into batches and embed each batch separately. 91 It's still possible to run into the rate limit between each embed call because the available token budget hasn't recovered between the calls, 92 but the built-in retry mechanism of the OpenAI client handles that. 93 """ 94 # Each chunk can hold at most self.chunk_size tokens, so tokens-per-minute by maximum tokens per chunk is the number of documents that can be embedded at once without exhausting the limit in a single request 95 embedding_batch_size = OPEN_AI_TOKEN_LIMIT // self.chunk_size 96 batches = create_chunks(documents, batch_size=embedding_batch_size) 97 embeddings: List[Optional[List[float]]] = [] 98 for batch in batches: 99 embeddings.extend( 100 self.embeddings.embed_documents([chunk.page_content for chunk in batch]) 101 ) 102 return embeddings
Embed the text of each chunk and return the resulting embedding vectors.
As the OpenAI API will fail if more than the per-minute limit worth of tokens is sent at once, we split the request into batches and embed each batch separately. It's still possible to run into the rate limit between each embed call because the available token budget hasn't recovered between the calls, but the built-in retry mechanism of the OpenAI client handles that.
110class OpenAIEmbedder(BaseOpenAIEmbedder): 111 def __init__(self, config: OpenAIEmbeddingConfigModel, chunk_size: int): 112 super().__init__( 113 OpenAIEmbeddings( # type: ignore [call-arg] 114 openai_api_key=config.openai_key, max_retries=15, disallowed_special=() 115 ), 116 chunk_size, 117 ) # type: ignore
Embedder is an abstract class that defines the interface for embedding text.
The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
Inherited Members
120class AzureOpenAIEmbedder(BaseOpenAIEmbedder): 121 def __init__(self, config: AzureOpenAIEmbeddingConfigModel, chunk_size: int): 122 # Azure OpenAI API has — as of 20230927 — a limit of 16 documents per request 123 super().__init__( 124 OpenAIEmbeddings( # type: ignore [call-arg] 125 openai_api_key=config.openai_key, 126 chunk_size=16, 127 max_retries=15, 128 openai_api_type="azure", 129 openai_api_version="2023-05-15", 130 openai_api_base=config.api_base, 131 deployment=config.deployment, 132 disallowed_special=(), 133 ), 134 chunk_size, 135 ) # type: ignore
Embedder is an abstract class that defines the interface for embedding text.
The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
121 def __init__(self, config: AzureOpenAIEmbeddingConfigModel, chunk_size: int): 122 # Azure OpenAI API has — as of 20230927 — a limit of 16 documents per request 123 super().__init__( 124 OpenAIEmbeddings( # type: ignore [call-arg] 125 openai_api_key=config.openai_key, 126 chunk_size=16, 127 max_retries=15, 128 openai_api_type="azure", 129 openai_api_version="2023-05-15", 130 openai_api_base=config.api_base, 131 deployment=config.deployment, 132 disallowed_special=(), 133 ), 134 chunk_size, 135 ) # type: ignore
Inherited Members
141class CohereEmbedder(Embedder): 142 def __init__(self, config: CohereEmbeddingConfigModel): 143 super().__init__() 144 # Client is set internally 145 self.embeddings = CohereEmbeddings( 146 cohere_api_key=config.cohere_key, 147 model="embed-english-light-v2.0", 148 user_agent="airbyte-cdk", 149 ) # type: ignore 150 151 def check(self) -> Optional[str]: 152 try: 153 self.embeddings.embed_query("test") 154 except Exception as e: 155 return format_exception(e) 156 return None 157 158 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 159 return cast( 160 List[Optional[List[float]]], 161 self.embeddings.embed_documents([document.page_content for document in documents]), 162 ) 163 164 @property 165 def embedding_dimensions(self) -> int: 166 # vector size produced by text-embedding-ada-002 model 167 return COHERE_VECTOR_SIZE
Embedder is an abstract class that defines the interface for embedding text.
The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
158 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 159 return cast( 160 List[Optional[List[float]]], 161 self.embeddings.embed_documents([document.page_content for document in documents]), 162 )
Embed the text of each chunk and return the resulting embedding vectors. If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.
170class FakeEmbedder(Embedder): 171 def __init__(self, config: FakeEmbeddingConfigModel): 172 super().__init__() 173 self.embeddings = FakeEmbeddings(size=OPEN_AI_VECTOR_SIZE) 174 175 def check(self) -> Optional[str]: 176 try: 177 self.embeddings.embed_query("test") 178 except Exception as e: 179 return format_exception(e) 180 return None 181 182 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 183 return cast( 184 List[Optional[List[float]]], 185 self.embeddings.embed_documents([document.page_content for document in documents]), 186 ) 187 188 @property 189 def embedding_dimensions(self) -> int: 190 # use same vector size as for OpenAI embeddings to keep it realistic 191 return OPEN_AI_VECTOR_SIZE
Embedder is an abstract class that defines the interface for embedding text.
The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
182 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 183 return cast( 184 List[Optional[List[float]]], 185 self.embeddings.embed_documents([document.page_content for document in documents]), 186 )
Embed the text of each chunk and return the resulting embedding vectors. If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.
197class OpenAICompatibleEmbedder(Embedder): 198 def __init__(self, config: OpenAICompatibleEmbeddingConfigModel): 199 super().__init__() 200 self.config = config 201 # Client is set internally 202 # Always set an API key even if there is none defined in the config because the validator will fail otherwise. Embedding APIs that don't require an API key don't fail if one is provided, so this is not breaking usage. 203 self.embeddings = LocalAIEmbeddings( 204 model=config.model_name, 205 openai_api_key=config.api_key or "dummy-api-key", 206 openai_api_base=config.base_url, 207 max_retries=15, 208 disallowed_special=(), 209 ) # type: ignore 210 211 def check(self) -> Optional[str]: 212 deployment_mode = os.environ.get("DEPLOYMENT_MODE", "") 213 if ( 214 deployment_mode.casefold() == CLOUD_DEPLOYMENT_MODE 215 and not self.config.base_url.startswith("https://") 216 ): 217 return "Base URL must start with https://" 218 219 try: 220 self.embeddings.embed_query("test") 221 except Exception as e: 222 return format_exception(e) 223 return None 224 225 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 226 return cast( 227 List[Optional[List[float]]], 228 self.embeddings.embed_documents([document.page_content for document in documents]), 229 ) 230 231 @property 232 def embedding_dimensions(self) -> int: 233 # vector size produced by the model 234 return self.config.dimensions
Embedder is an abstract class that defines the interface for embedding text.
The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
198 def __init__(self, config: OpenAICompatibleEmbeddingConfigModel): 199 super().__init__() 200 self.config = config 201 # Client is set internally 202 # Always set an API key even if there is none defined in the config because the validator will fail otherwise. Embedding APIs that don't require an API key don't fail if one is provided, so this is not breaking usage. 203 self.embeddings = LocalAIEmbeddings( 204 model=config.model_name, 205 openai_api_key=config.api_key or "dummy-api-key", 206 openai_api_base=config.base_url, 207 max_retries=15, 208 disallowed_special=(), 209 ) # type: ignore
211 def check(self) -> Optional[str]: 212 deployment_mode = os.environ.get("DEPLOYMENT_MODE", "") 213 if ( 214 deployment_mode.casefold() == CLOUD_DEPLOYMENT_MODE 215 and not self.config.base_url.startswith("https://") 216 ): 217 return "Base URL must start with https://" 218 219 try: 220 self.embeddings.embed_query("test") 221 except Exception as e: 222 return format_exception(e) 223 return None
225 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 226 return cast( 227 List[Optional[List[float]]], 228 self.embeddings.embed_documents([document.page_content for document in documents]), 229 )
Embed the text of each chunk and return the resulting embedding vectors. If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.
237class FromFieldEmbedder(Embedder): 238 def __init__(self, config: FromFieldEmbeddingConfigModel): 239 super().__init__() 240 self.config = config 241 242 def check(self) -> Optional[str]: 243 return None 244 245 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 246 """ 247 From each chunk, pull the embedding from the field specified in the config. 248 Check that the field exists, is a list of numbers and is the correct size. If not, raise an AirbyteTracedException explaining the problem. 249 """ 250 embeddings: List[Optional[List[float]]] = [] 251 for document in documents: 252 data = document.record.data 253 if self.config.field_name not in data: 254 raise AirbyteTracedException( 255 internal_message="Embedding vector field not found", 256 failure_type=FailureType.config_error, 257 message=f"Record {str(data)[:250]}... in stream {document.record.stream} does not contain embedding vector field {self.config.field_name}. Please check your embedding configuration, the embedding vector field has to be set correctly on every record.", 258 ) 259 field = data[self.config.field_name] 260 if not isinstance(field, list) or not all(isinstance(x, (int, float)) for x in field): 261 raise AirbyteTracedException( 262 internal_message="Embedding vector field not a list of numbers", 263 failure_type=FailureType.config_error, 264 message=f"Record {str(data)[:250]}... in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it is not a list of numbers. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.", 265 ) 266 if len(field) != self.config.dimensions: 267 raise AirbyteTracedException( 268 internal_message="Embedding vector field has wrong length", 269 failure_type=FailureType.config_error, 270 message=f"Record {str(data)[:250]}... in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it has length {len(field)} instead of the configured {self.config.dimensions}. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.", 271 ) 272 embeddings.append(field) 273 274 return embeddings 275 276 @property 277 def embedding_dimensions(self) -> int: 278 return self.config.dimensions
Embedder is an abstract class that defines the interface for embedding text.
The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
245 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 246 """ 247 From each chunk, pull the embedding from the field specified in the config. 248 Check that the field exists, is a list of numbers and is the correct size. If not, raise an AirbyteTracedException explaining the problem. 249 """ 250 embeddings: List[Optional[List[float]]] = [] 251 for document in documents: 252 data = document.record.data 253 if self.config.field_name not in data: 254 raise AirbyteTracedException( 255 internal_message="Embedding vector field not found", 256 failure_type=FailureType.config_error, 257 message=f"Record {str(data)[:250]}... in stream {document.record.stream} does not contain embedding vector field {self.config.field_name}. Please check your embedding configuration, the embedding vector field has to be set correctly on every record.", 258 ) 259 field = data[self.config.field_name] 260 if not isinstance(field, list) or not all(isinstance(x, (int, float)) for x in field): 261 raise AirbyteTracedException( 262 internal_message="Embedding vector field not a list of numbers", 263 failure_type=FailureType.config_error, 264 message=f"Record {str(data)[:250]}... in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it is not a list of numbers. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.", 265 ) 266 if len(field) != self.config.dimensions: 267 raise AirbyteTracedException( 268 internal_message="Embedding vector field has wrong length", 269 failure_type=FailureType.config_error, 270 message=f"Record {str(data)[:250]}... in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it has length {len(field)} instead of the configured {self.config.dimensions}. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.", 271 ) 272 embeddings.append(field) 273 274 return embeddings
From each chunk, pull the embedding from the field specified in the config. Check that the field exists, is a list of numbers and is the correct size. If not, raise an AirbyteTracedException explaining the problem.
291def create_from_config( 292 embedding_config: Union[ 293 AzureOpenAIEmbeddingConfigModel, 294 CohereEmbeddingConfigModel, 295 FakeEmbeddingConfigModel, 296 FromFieldEmbeddingConfigModel, 297 OpenAIEmbeddingConfigModel, 298 OpenAICompatibleEmbeddingConfigModel, 299 ], 300 processing_config: ProcessingConfigModel, 301) -> Embedder: 302 if embedding_config.mode == "azure_openai" or embedding_config.mode == "openai": 303 return cast( 304 Embedder, 305 embedder_map[embedding_config.mode](embedding_config, processing_config.chunk_size), 306 ) 307 else: 308 return cast(Embedder, embedder_map[embedding_config.mode](embedding_config))