airbyte_cdk.destinations.vector_db_based.embedder
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import os 6from abc import ABC, abstractmethod 7from dataclasses import dataclass 8from typing import List, Optional, Union, cast 9 10from langchain.embeddings.cohere import CohereEmbeddings 11from langchain.embeddings.fake import FakeEmbeddings 12from langchain.embeddings.localai import LocalAIEmbeddings 13from langchain.embeddings.openai import OpenAIEmbeddings 14 15from airbyte_cdk.destinations.vector_db_based.config import ( 16 AzureOpenAIEmbeddingConfigModel, 17 CohereEmbeddingConfigModel, 18 FakeEmbeddingConfigModel, 19 FromFieldEmbeddingConfigModel, 20 OpenAICompatibleEmbeddingConfigModel, 21 OpenAIEmbeddingConfigModel, 22 ProcessingConfigModel, 23) 24from airbyte_cdk.destinations.vector_db_based.utils import create_chunks, format_exception 25from airbyte_cdk.models import AirbyteRecordMessage 26from airbyte_cdk.utils.traced_exception import AirbyteTracedException, FailureType 27 28 29@dataclass 30class Document: 31 page_content: str 32 record: AirbyteRecordMessage 33 34 35class Embedder(ABC): 36 """ 37 Embedder is an abstract class that defines the interface for embedding text. 38 39 The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. 40 The destination connector is responsible to create an embedder instance and pass it to the writer. 41 The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed. 42 """ 43 44 def __init__(self) -> None: 45 pass 46 47 @abstractmethod 48 def check(self) -> Optional[str]: 49 pass 50 51 @abstractmethod 52 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 53 """ 54 Embed the text of each chunk and return the resulting embedding vectors. 55 If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk. 56 """ 57 pass 58 59 @property 60 @abstractmethod 61 def embedding_dimensions(self) -> int: 62 pass 63 64 65OPEN_AI_VECTOR_SIZE = 1536 66 67OPEN_AI_TOKEN_LIMIT = 150_000 # limit of tokens per minute 68 69 70class BaseOpenAIEmbedder(Embedder): 71 def __init__(self, embeddings: OpenAIEmbeddings, chunk_size: int): 72 super().__init__() 73 self.embeddings = embeddings 74 self.chunk_size = chunk_size 75 76 def check(self) -> Optional[str]: 77 try: 78 self.embeddings.embed_query("test") 79 except Exception as e: 80 return format_exception(e) 81 return None 82 83 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 84 """ 85 Embed the text of each chunk and return the resulting embedding vectors. 86 87 As the OpenAI API will fail if more than the per-minute limit worth of tokens is sent at once, we split the request into batches and embed each batch separately. 88 It's still possible to run into the rate limit between each embed call because the available token budget hasn't recovered between the calls, 89 but the built-in retry mechanism of the OpenAI client handles that. 90 """ 91 # Each chunk can hold at most self.chunk_size tokens, so tokens-per-minute by maximum tokens per chunk is the number of documents that can be embedded at once without exhausting the limit in a single request 92 embedding_batch_size = OPEN_AI_TOKEN_LIMIT // self.chunk_size 93 batches = create_chunks(documents, batch_size=embedding_batch_size) 94 embeddings: List[Optional[List[float]]] = [] 95 for batch in batches: 96 embeddings.extend( 97 self.embeddings.embed_documents([chunk.page_content for chunk in batch]) 98 ) 99 return embeddings 100 101 @property 102 def embedding_dimensions(self) -> int: 103 # vector size produced by text-embedding-ada-002 model 104 return OPEN_AI_VECTOR_SIZE 105 106 107class OpenAIEmbedder(BaseOpenAIEmbedder): 108 def __init__(self, config: OpenAIEmbeddingConfigModel, chunk_size: int): 109 super().__init__( 110 OpenAIEmbeddings( # type: ignore [call-arg] 111 openai_api_key=config.openai_key, max_retries=15, disallowed_special=() 112 ), 113 chunk_size, 114 ) # type: ignore 115 116 117class AzureOpenAIEmbedder(BaseOpenAIEmbedder): 118 def __init__(self, config: AzureOpenAIEmbeddingConfigModel, chunk_size: int): 119 # Azure OpenAI API has — as of 20230927 — a limit of 16 documents per request 120 super().__init__( 121 OpenAIEmbeddings( # type: ignore [call-arg] 122 openai_api_key=config.openai_key, 123 chunk_size=16, 124 max_retries=15, 125 openai_api_type="azure", 126 openai_api_version="2023-05-15", 127 openai_api_base=config.api_base, 128 deployment=config.deployment, 129 disallowed_special=(), 130 ), 131 chunk_size, 132 ) # type: ignore 133 134 135COHERE_VECTOR_SIZE = 1024 136 137 138class CohereEmbedder(Embedder): 139 def __init__(self, config: CohereEmbeddingConfigModel): 140 super().__init__() 141 # Client is set internally 142 self.embeddings = CohereEmbeddings( 143 cohere_api_key=config.cohere_key, model="embed-english-light-v2.0" 144 ) # type: ignore 145 146 def check(self) -> Optional[str]: 147 try: 148 self.embeddings.embed_query("test") 149 except Exception as e: 150 return format_exception(e) 151 return None 152 153 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 154 return cast( 155 List[Optional[List[float]]], 156 self.embeddings.embed_documents([document.page_content for document in documents]), 157 ) 158 159 @property 160 def embedding_dimensions(self) -> int: 161 # vector size produced by text-embedding-ada-002 model 162 return COHERE_VECTOR_SIZE 163 164 165class FakeEmbedder(Embedder): 166 def __init__(self, config: FakeEmbeddingConfigModel): 167 super().__init__() 168 self.embeddings = FakeEmbeddings(size=OPEN_AI_VECTOR_SIZE) 169 170 def check(self) -> Optional[str]: 171 try: 172 self.embeddings.embed_query("test") 173 except Exception as e: 174 return format_exception(e) 175 return None 176 177 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 178 return cast( 179 List[Optional[List[float]]], 180 self.embeddings.embed_documents([document.page_content for document in documents]), 181 ) 182 183 @property 184 def embedding_dimensions(self) -> int: 185 # use same vector size as for OpenAI embeddings to keep it realistic 186 return OPEN_AI_VECTOR_SIZE 187 188 189CLOUD_DEPLOYMENT_MODE = "cloud" 190 191 192class OpenAICompatibleEmbedder(Embedder): 193 def __init__(self, config: OpenAICompatibleEmbeddingConfigModel): 194 super().__init__() 195 self.config = config 196 # Client is set internally 197 # Always set an API key even if there is none defined in the config because the validator will fail otherwise. Embedding APIs that don't require an API key don't fail if one is provided, so this is not breaking usage. 198 self.embeddings = LocalAIEmbeddings( 199 model=config.model_name, 200 openai_api_key=config.api_key or "dummy-api-key", 201 openai_api_base=config.base_url, 202 max_retries=15, 203 disallowed_special=(), 204 ) # type: ignore 205 206 def check(self) -> Optional[str]: 207 deployment_mode = os.environ.get("DEPLOYMENT_MODE", "") 208 if ( 209 deployment_mode.casefold() == CLOUD_DEPLOYMENT_MODE 210 and not self.config.base_url.startswith("https://") 211 ): 212 return "Base URL must start with https://" 213 214 try: 215 self.embeddings.embed_query("test") 216 except Exception as e: 217 return format_exception(e) 218 return None 219 220 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 221 return cast( 222 List[Optional[List[float]]], 223 self.embeddings.embed_documents([document.page_content for document in documents]), 224 ) 225 226 @property 227 def embedding_dimensions(self) -> int: 228 # vector size produced by the model 229 return self.config.dimensions 230 231 232class FromFieldEmbedder(Embedder): 233 def __init__(self, config: FromFieldEmbeddingConfigModel): 234 super().__init__() 235 self.config = config 236 237 def check(self) -> Optional[str]: 238 return None 239 240 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 241 """ 242 From each chunk, pull the embedding from the field specified in the config. 243 Check that the field exists, is a list of numbers and is the correct size. If not, raise an AirbyteTracedException explaining the problem. 244 """ 245 embeddings: List[Optional[List[float]]] = [] 246 for document in documents: 247 data = document.record.data 248 if self.config.field_name not in data: 249 raise AirbyteTracedException( 250 internal_message="Embedding vector field not found", 251 failure_type=FailureType.config_error, 252 message=f"Record {str(data)[:250]}... in stream {document.record.stream} does not contain embedding vector field {self.config.field_name}. Please check your embedding configuration, the embedding vector field has to be set correctly on every record.", 253 ) 254 field = data[self.config.field_name] 255 if not isinstance(field, list) or not all(isinstance(x, (int, float)) for x in field): 256 raise AirbyteTracedException( 257 internal_message="Embedding vector field not a list of numbers", 258 failure_type=FailureType.config_error, 259 message=f"Record {str(data)[:250]}... in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it is not a list of numbers. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.", 260 ) 261 if len(field) != self.config.dimensions: 262 raise AirbyteTracedException( 263 internal_message="Embedding vector field has wrong length", 264 failure_type=FailureType.config_error, 265 message=f"Record {str(data)[:250]}... in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it has length {len(field)} instead of the configured {self.config.dimensions}. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.", 266 ) 267 embeddings.append(field) 268 269 return embeddings 270 271 @property 272 def embedding_dimensions(self) -> int: 273 return self.config.dimensions 274 275 276embedder_map = { 277 "openai": OpenAIEmbedder, 278 "cohere": CohereEmbedder, 279 "fake": FakeEmbedder, 280 "azure_openai": AzureOpenAIEmbedder, 281 "from_field": FromFieldEmbedder, 282 "openai_compatible": OpenAICompatibleEmbedder, 283} 284 285 286def create_from_config( 287 embedding_config: Union[ 288 AzureOpenAIEmbeddingConfigModel, 289 CohereEmbeddingConfigModel, 290 FakeEmbeddingConfigModel, 291 FromFieldEmbeddingConfigModel, 292 OpenAIEmbeddingConfigModel, 293 OpenAICompatibleEmbeddingConfigModel, 294 ], 295 processing_config: ProcessingConfigModel, 296) -> Embedder: 297 if embedding_config.mode == "azure_openai" or embedding_config.mode == "openai": 298 return cast( 299 Embedder, 300 embedder_map[embedding_config.mode](embedding_config, processing_config.chunk_size), 301 ) 302 else: 303 return cast(Embedder, embedder_map[embedding_config.mode](embedding_config))
36class Embedder(ABC): 37 """ 38 Embedder is an abstract class that defines the interface for embedding text. 39 40 The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. 41 The destination connector is responsible to create an embedder instance and pass it to the writer. 42 The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed. 43 """ 44 45 def __init__(self) -> None: 46 pass 47 48 @abstractmethod 49 def check(self) -> Optional[str]: 50 pass 51 52 @abstractmethod 53 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 54 """ 55 Embed the text of each chunk and return the resulting embedding vectors. 56 If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk. 57 """ 58 pass 59 60 @property 61 @abstractmethod 62 def embedding_dimensions(self) -> int: 63 pass
Embedder is an abstract class that defines the interface for embedding text.
The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
52 @abstractmethod 53 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 54 """ 55 Embed the text of each chunk and return the resulting embedding vectors. 56 If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk. 57 """ 58 pass
Embed the text of each chunk and return the resulting embedding vectors. If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.
71class BaseOpenAIEmbedder(Embedder): 72 def __init__(self, embeddings: OpenAIEmbeddings, chunk_size: int): 73 super().__init__() 74 self.embeddings = embeddings 75 self.chunk_size = chunk_size 76 77 def check(self) -> Optional[str]: 78 try: 79 self.embeddings.embed_query("test") 80 except Exception as e: 81 return format_exception(e) 82 return None 83 84 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 85 """ 86 Embed the text of each chunk and return the resulting embedding vectors. 87 88 As the OpenAI API will fail if more than the per-minute limit worth of tokens is sent at once, we split the request into batches and embed each batch separately. 89 It's still possible to run into the rate limit between each embed call because the available token budget hasn't recovered between the calls, 90 but the built-in retry mechanism of the OpenAI client handles that. 91 """ 92 # Each chunk can hold at most self.chunk_size tokens, so tokens-per-minute by maximum tokens per chunk is the number of documents that can be embedded at once without exhausting the limit in a single request 93 embedding_batch_size = OPEN_AI_TOKEN_LIMIT // self.chunk_size 94 batches = create_chunks(documents, batch_size=embedding_batch_size) 95 embeddings: List[Optional[List[float]]] = [] 96 for batch in batches: 97 embeddings.extend( 98 self.embeddings.embed_documents([chunk.page_content for chunk in batch]) 99 ) 100 return embeddings 101 102 @property 103 def embedding_dimensions(self) -> int: 104 # vector size produced by text-embedding-ada-002 model 105 return OPEN_AI_VECTOR_SIZE
Embedder is an abstract class that defines the interface for embedding text.
The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
84 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 85 """ 86 Embed the text of each chunk and return the resulting embedding vectors. 87 88 As the OpenAI API will fail if more than the per-minute limit worth of tokens is sent at once, we split the request into batches and embed each batch separately. 89 It's still possible to run into the rate limit between each embed call because the available token budget hasn't recovered between the calls, 90 but the built-in retry mechanism of the OpenAI client handles that. 91 """ 92 # Each chunk can hold at most self.chunk_size tokens, so tokens-per-minute by maximum tokens per chunk is the number of documents that can be embedded at once without exhausting the limit in a single request 93 embedding_batch_size = OPEN_AI_TOKEN_LIMIT // self.chunk_size 94 batches = create_chunks(documents, batch_size=embedding_batch_size) 95 embeddings: List[Optional[List[float]]] = [] 96 for batch in batches: 97 embeddings.extend( 98 self.embeddings.embed_documents([chunk.page_content for chunk in batch]) 99 ) 100 return embeddings
Embed the text of each chunk and return the resulting embedding vectors.
As the OpenAI API will fail if more than the per-minute limit worth of tokens is sent at once, we split the request into batches and embed each batch separately. It's still possible to run into the rate limit between each embed call because the available token budget hasn't recovered between the calls, but the built-in retry mechanism of the OpenAI client handles that.
108class OpenAIEmbedder(BaseOpenAIEmbedder): 109 def __init__(self, config: OpenAIEmbeddingConfigModel, chunk_size: int): 110 super().__init__( 111 OpenAIEmbeddings( # type: ignore [call-arg] 112 openai_api_key=config.openai_key, max_retries=15, disallowed_special=() 113 ), 114 chunk_size, 115 ) # type: ignore
Embedder is an abstract class that defines the interface for embedding text.
The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
Inherited Members
118class AzureOpenAIEmbedder(BaseOpenAIEmbedder): 119 def __init__(self, config: AzureOpenAIEmbeddingConfigModel, chunk_size: int): 120 # Azure OpenAI API has — as of 20230927 — a limit of 16 documents per request 121 super().__init__( 122 OpenAIEmbeddings( # type: ignore [call-arg] 123 openai_api_key=config.openai_key, 124 chunk_size=16, 125 max_retries=15, 126 openai_api_type="azure", 127 openai_api_version="2023-05-15", 128 openai_api_base=config.api_base, 129 deployment=config.deployment, 130 disallowed_special=(), 131 ), 132 chunk_size, 133 ) # type: ignore
Embedder is an abstract class that defines the interface for embedding text.
The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
119 def __init__(self, config: AzureOpenAIEmbeddingConfigModel, chunk_size: int): 120 # Azure OpenAI API has — as of 20230927 — a limit of 16 documents per request 121 super().__init__( 122 OpenAIEmbeddings( # type: ignore [call-arg] 123 openai_api_key=config.openai_key, 124 chunk_size=16, 125 max_retries=15, 126 openai_api_type="azure", 127 openai_api_version="2023-05-15", 128 openai_api_base=config.api_base, 129 deployment=config.deployment, 130 disallowed_special=(), 131 ), 132 chunk_size, 133 ) # type: ignore
Inherited Members
139class CohereEmbedder(Embedder): 140 def __init__(self, config: CohereEmbeddingConfigModel): 141 super().__init__() 142 # Client is set internally 143 self.embeddings = CohereEmbeddings( 144 cohere_api_key=config.cohere_key, model="embed-english-light-v2.0" 145 ) # type: ignore 146 147 def check(self) -> Optional[str]: 148 try: 149 self.embeddings.embed_query("test") 150 except Exception as e: 151 return format_exception(e) 152 return None 153 154 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 155 return cast( 156 List[Optional[List[float]]], 157 self.embeddings.embed_documents([document.page_content for document in documents]), 158 ) 159 160 @property 161 def embedding_dimensions(self) -> int: 162 # vector size produced by text-embedding-ada-002 model 163 return COHERE_VECTOR_SIZE
Embedder is an abstract class that defines the interface for embedding text.
The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
154 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 155 return cast( 156 List[Optional[List[float]]], 157 self.embeddings.embed_documents([document.page_content for document in documents]), 158 )
Embed the text of each chunk and return the resulting embedding vectors. If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.
166class FakeEmbedder(Embedder): 167 def __init__(self, config: FakeEmbeddingConfigModel): 168 super().__init__() 169 self.embeddings = FakeEmbeddings(size=OPEN_AI_VECTOR_SIZE) 170 171 def check(self) -> Optional[str]: 172 try: 173 self.embeddings.embed_query("test") 174 except Exception as e: 175 return format_exception(e) 176 return None 177 178 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 179 return cast( 180 List[Optional[List[float]]], 181 self.embeddings.embed_documents([document.page_content for document in documents]), 182 ) 183 184 @property 185 def embedding_dimensions(self) -> int: 186 # use same vector size as for OpenAI embeddings to keep it realistic 187 return OPEN_AI_VECTOR_SIZE
Embedder is an abstract class that defines the interface for embedding text.
The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
178 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 179 return cast( 180 List[Optional[List[float]]], 181 self.embeddings.embed_documents([document.page_content for document in documents]), 182 )
Embed the text of each chunk and return the resulting embedding vectors. If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.
193class OpenAICompatibleEmbedder(Embedder): 194 def __init__(self, config: OpenAICompatibleEmbeddingConfigModel): 195 super().__init__() 196 self.config = config 197 # Client is set internally 198 # Always set an API key even if there is none defined in the config because the validator will fail otherwise. Embedding APIs that don't require an API key don't fail if one is provided, so this is not breaking usage. 199 self.embeddings = LocalAIEmbeddings( 200 model=config.model_name, 201 openai_api_key=config.api_key or "dummy-api-key", 202 openai_api_base=config.base_url, 203 max_retries=15, 204 disallowed_special=(), 205 ) # type: ignore 206 207 def check(self) -> Optional[str]: 208 deployment_mode = os.environ.get("DEPLOYMENT_MODE", "") 209 if ( 210 deployment_mode.casefold() == CLOUD_DEPLOYMENT_MODE 211 and not self.config.base_url.startswith("https://") 212 ): 213 return "Base URL must start with https://" 214 215 try: 216 self.embeddings.embed_query("test") 217 except Exception as e: 218 return format_exception(e) 219 return None 220 221 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 222 return cast( 223 List[Optional[List[float]]], 224 self.embeddings.embed_documents([document.page_content for document in documents]), 225 ) 226 227 @property 228 def embedding_dimensions(self) -> int: 229 # vector size produced by the model 230 return self.config.dimensions
Embedder is an abstract class that defines the interface for embedding text.
The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
194 def __init__(self, config: OpenAICompatibleEmbeddingConfigModel): 195 super().__init__() 196 self.config = config 197 # Client is set internally 198 # Always set an API key even if there is none defined in the config because the validator will fail otherwise. Embedding APIs that don't require an API key don't fail if one is provided, so this is not breaking usage. 199 self.embeddings = LocalAIEmbeddings( 200 model=config.model_name, 201 openai_api_key=config.api_key or "dummy-api-key", 202 openai_api_base=config.base_url, 203 max_retries=15, 204 disallowed_special=(), 205 ) # type: ignore
207 def check(self) -> Optional[str]: 208 deployment_mode = os.environ.get("DEPLOYMENT_MODE", "") 209 if ( 210 deployment_mode.casefold() == CLOUD_DEPLOYMENT_MODE 211 and not self.config.base_url.startswith("https://") 212 ): 213 return "Base URL must start with https://" 214 215 try: 216 self.embeddings.embed_query("test") 217 except Exception as e: 218 return format_exception(e) 219 return None
221 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 222 return cast( 223 List[Optional[List[float]]], 224 self.embeddings.embed_documents([document.page_content for document in documents]), 225 )
Embed the text of each chunk and return the resulting embedding vectors. If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.
233class FromFieldEmbedder(Embedder): 234 def __init__(self, config: FromFieldEmbeddingConfigModel): 235 super().__init__() 236 self.config = config 237 238 def check(self) -> Optional[str]: 239 return None 240 241 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 242 """ 243 From each chunk, pull the embedding from the field specified in the config. 244 Check that the field exists, is a list of numbers and is the correct size. If not, raise an AirbyteTracedException explaining the problem. 245 """ 246 embeddings: List[Optional[List[float]]] = [] 247 for document in documents: 248 data = document.record.data 249 if self.config.field_name not in data: 250 raise AirbyteTracedException( 251 internal_message="Embedding vector field not found", 252 failure_type=FailureType.config_error, 253 message=f"Record {str(data)[:250]}... in stream {document.record.stream} does not contain embedding vector field {self.config.field_name}. Please check your embedding configuration, the embedding vector field has to be set correctly on every record.", 254 ) 255 field = data[self.config.field_name] 256 if not isinstance(field, list) or not all(isinstance(x, (int, float)) for x in field): 257 raise AirbyteTracedException( 258 internal_message="Embedding vector field not a list of numbers", 259 failure_type=FailureType.config_error, 260 message=f"Record {str(data)[:250]}... in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it is not a list of numbers. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.", 261 ) 262 if len(field) != self.config.dimensions: 263 raise AirbyteTracedException( 264 internal_message="Embedding vector field has wrong length", 265 failure_type=FailureType.config_error, 266 message=f"Record {str(data)[:250]}... in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it has length {len(field)} instead of the configured {self.config.dimensions}. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.", 267 ) 268 embeddings.append(field) 269 270 return embeddings 271 272 @property 273 def embedding_dimensions(self) -> int: 274 return self.config.dimensions
Embedder is an abstract class that defines the interface for embedding text.
The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
241 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 242 """ 243 From each chunk, pull the embedding from the field specified in the config. 244 Check that the field exists, is a list of numbers and is the correct size. If not, raise an AirbyteTracedException explaining the problem. 245 """ 246 embeddings: List[Optional[List[float]]] = [] 247 for document in documents: 248 data = document.record.data 249 if self.config.field_name not in data: 250 raise AirbyteTracedException( 251 internal_message="Embedding vector field not found", 252 failure_type=FailureType.config_error, 253 message=f"Record {str(data)[:250]}... in stream {document.record.stream} does not contain embedding vector field {self.config.field_name}. Please check your embedding configuration, the embedding vector field has to be set correctly on every record.", 254 ) 255 field = data[self.config.field_name] 256 if not isinstance(field, list) or not all(isinstance(x, (int, float)) for x in field): 257 raise AirbyteTracedException( 258 internal_message="Embedding vector field not a list of numbers", 259 failure_type=FailureType.config_error, 260 message=f"Record {str(data)[:250]}... in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it is not a list of numbers. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.", 261 ) 262 if len(field) != self.config.dimensions: 263 raise AirbyteTracedException( 264 internal_message="Embedding vector field has wrong length", 265 failure_type=FailureType.config_error, 266 message=f"Record {str(data)[:250]}... in stream {document.record.stream} does contain embedding vector field {self.config.field_name}, but it has length {len(field)} instead of the configured {self.config.dimensions}. Please check your embedding configuration, the embedding vector field has to be a list of numbers of length {self.config.dimensions} on every record.", 267 ) 268 embeddings.append(field) 269 270 return embeddings
From each chunk, pull the embedding from the field specified in the config. Check that the field exists, is a list of numbers and is the correct size. If not, raise an AirbyteTracedException explaining the problem.
287def create_from_config( 288 embedding_config: Union[ 289 AzureOpenAIEmbeddingConfigModel, 290 CohereEmbeddingConfigModel, 291 FakeEmbeddingConfigModel, 292 FromFieldEmbeddingConfigModel, 293 OpenAIEmbeddingConfigModel, 294 OpenAICompatibleEmbeddingConfigModel, 295 ], 296 processing_config: ProcessingConfigModel, 297) -> Embedder: 298 if embedding_config.mode == "azure_openai" or embedding_config.mode == "openai": 299 return cast( 300 Embedder, 301 embedder_map[embedding_config.mode](embedding_config, processing_config.chunk_size), 302 ) 303 else: 304 return cast(Embedder, embedder_map[embedding_config.mode](embedding_config))