airbyte_cdk.destinations.vector_db_based
1# 2# Copyright (c) 2021 Airbyte, Inc., all rights reserved. 3# 4 5from .config import ( 6 AzureOpenAIEmbeddingConfigModel, 7 CohereEmbeddingConfigModel, 8 FakeEmbeddingConfigModel, 9 FromFieldEmbeddingConfigModel, 10 OpenAICompatibleEmbeddingConfigModel, 11 OpenAIEmbeddingConfigModel, 12 ProcessingConfigModel, 13) 14from .document_processor import Chunk, DocumentProcessor 15from .embedder import CohereEmbedder, Embedder, FakeEmbedder, OpenAIEmbedder 16from .indexer import Indexer 17from .writer import Writer 18 19__all__ = [ 20 "AzureOpenAIEmbedder", 21 "AzureOpenAIEmbeddingConfigModel", 22 "Chunk", 23 "CohereEmbedder", 24 "CohereEmbeddingConfigModel", 25 "DocumentProcessor", 26 "Embedder", 27 "FakeEmbedder", 28 "FakeEmbeddingConfigModel", 29 "FromFieldEmbedder", 30 "FromFieldEmbeddingConfigModel", 31 "Indexer", 32 "OpenAICompatibleEmbedder", 33 "OpenAICompatibleEmbeddingConfigModel", 34 "OpenAIEmbedder", 35 "OpenAIEmbeddingConfigModel", 36 "ProcessingConfigModel", 37 "Writer", 38]
174class AzureOpenAIEmbeddingConfigModel(BaseModel): 175 mode: Literal["azure_openai"] = Field("azure_openai", const=True) 176 openai_key: str = Field( 177 ..., 178 title="Azure OpenAI API key", 179 airbyte_secret=True, 180 description="The API key for your Azure OpenAI resource. You can find this in the Azure portal under your Azure OpenAI resource", 181 ) 182 api_base: str = Field( 183 ..., 184 title="Resource base URL", 185 description="The base URL for your Azure OpenAI resource. You can find this in the Azure portal under your Azure OpenAI resource", 186 examples=["https://your-resource-name.openai.azure.com"], 187 ) 188 deployment: str = Field( 189 ..., 190 title="Deployment", 191 description="The deployment for your Azure OpenAI resource. You can find this in the Azure portal under your Azure OpenAI resource", 192 examples=["your-resource-name"], 193 ) 194 195 class Config(OneOfOptionConfig): 196 title = "Azure OpenAI" 197 description = "Use the Azure-hosted OpenAI API to embed text. This option is using the text-embedding-ada-002 model with 1536 embedding dimensions." 198 discriminator = "mode"
195 class Config(OneOfOptionConfig): 196 title = "Azure OpenAI" 197 description = "Use the Azure-hosted OpenAI API to embed text. This option is using the text-embedding-ada-002 model with 1536 embedding dimensions." 198 discriminator = "mode"
Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.
Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).
Usage:
class OptionModel(BaseModel): mode: Literal["option_a"] = Field("option_a", const=True) option_a_field: str = Field(...) class Config(OneOfOptionConfig): title = "Option A" description = "Option A description" discriminator = "mode"
Inherited Members
36@dataclass 37class Chunk: 38 page_content: Optional[str] 39 metadata: Dict[str, Any] 40 record: AirbyteRecordMessage 41 embedding: Optional[List[float]] = None
139class CohereEmbedder(Embedder): 140 def __init__(self, config: CohereEmbeddingConfigModel): 141 super().__init__() 142 # Client is set internally 143 self.embeddings = CohereEmbeddings( 144 cohere_api_key=config.cohere_key, model="embed-english-light-v2.0" 145 ) # type: ignore 146 147 def check(self) -> Optional[str]: 148 try: 149 self.embeddings.embed_query("test") 150 except Exception as e: 151 return format_exception(e) 152 return None 153 154 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 155 return cast( 156 List[Optional[List[float]]], 157 self.embeddings.embed_documents([document.page_content for document in documents]), 158 ) 159 160 @property 161 def embedding_dimensions(self) -> int: 162 # vector size produced by text-embedding-ada-002 model 163 return COHERE_VECTOR_SIZE
Embedder is an abstract class that defines the interface for embedding text.
The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
154 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 155 return cast( 156 List[Optional[List[float]]], 157 self.embeddings.embed_documents([document.page_content for document in documents]), 158 )
Embed the text of each chunk and return the resulting embedding vectors. If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.
231class CohereEmbeddingConfigModel(BaseModel): 232 mode: Literal["cohere"] = Field("cohere", const=True) 233 cohere_key: str = Field(..., title="Cohere API key", airbyte_secret=True) 234 235 class Config(OneOfOptionConfig): 236 title = "Cohere" 237 description = "Use the Cohere API to embed text." 238 discriminator = "mode"
235 class Config(OneOfOptionConfig): 236 title = "Cohere" 237 description = "Use the Cohere API to embed text." 238 discriminator = "mode"
Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.
Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).
Usage:
class OptionModel(BaseModel): mode: Literal["option_a"] = Field("option_a", const=True) option_a_field: str = Field(...) class Config(OneOfOptionConfig): title = "Option A" description = "Option A description" discriminator = "mode"
Inherited Members
54class DocumentProcessor: 55 """ 56 DocumentProcessor is a helper class that generates documents from Airbyte records. 57 58 It is used to generate documents from records before writing them to the destination: 59 * The text fields are extracted from the record and concatenated to a single string. 60 * The metadata fields are extracted from the record and added to the document metadata. 61 * The document is split into chunks of a given size using a langchain text splitter. 62 63 The Writer class uses the DocumentProcessor class to internally generate documents from records - in most cases you don't need to use it directly, 64 except if you want to implement a custom writer. 65 66 The config parameters specified by the ProcessingConfigModel has to be made part of the connector spec to allow the user to configure the document processor. 67 Calling DocumentProcessor.check_config(config) will validate the config and return an error message if the config is invalid. 68 """ 69 70 streams: Mapping[str, ConfiguredAirbyteStream] 71 72 @staticmethod 73 def check_config(config: ProcessingConfigModel) -> Optional[str]: 74 if config.text_splitter is not None and config.text_splitter.mode == "separator": 75 for s in config.text_splitter.separators: 76 try: 77 separator = json.loads(s) 78 if not isinstance(separator, str): 79 return f"Invalid separator: {s}. Separator needs to be a valid JSON string using double quotes." 80 except json.decoder.JSONDecodeError: 81 return f"Invalid separator: {s}. Separator needs to be a valid JSON string using double quotes." 82 return None 83 84 def _get_text_splitter( 85 self, 86 chunk_size: int, 87 chunk_overlap: int, 88 splitter_config: Optional[TextSplitterConfigModel], 89 ) -> RecursiveCharacterTextSplitter: 90 if splitter_config is None: 91 splitter_config = SeparatorSplitterConfigModel(mode="separator") 92 if splitter_config.mode == "separator": 93 return RecursiveCharacterTextSplitter.from_tiktoken_encoder( 94 chunk_size=chunk_size, 95 chunk_overlap=chunk_overlap, 96 separators=[json.loads(s) for s in splitter_config.separators], 97 keep_separator=splitter_config.keep_separator, 98 disallowed_special=(), 99 ) 100 if splitter_config.mode == "markdown": 101 return RecursiveCharacterTextSplitter.from_tiktoken_encoder( 102 chunk_size=chunk_size, 103 chunk_overlap=chunk_overlap, 104 separators=headers_to_split_on[: splitter_config.split_level], 105 is_separator_regex=True, 106 keep_separator=True, 107 disallowed_special=(), 108 ) 109 if splitter_config.mode == "code": 110 return RecursiveCharacterTextSplitter.from_tiktoken_encoder( 111 chunk_size=chunk_size, 112 chunk_overlap=chunk_overlap, 113 separators=RecursiveCharacterTextSplitter.get_separators_for_language( 114 Language(splitter_config.language) 115 ), 116 disallowed_special=(), 117 ) 118 119 def __init__(self, config: ProcessingConfigModel, catalog: ConfiguredAirbyteCatalog): 120 self.streams = { 121 create_stream_identifier(stream.stream): stream for stream in catalog.streams 122 } 123 124 self.splitter = self._get_text_splitter( 125 config.chunk_size, config.chunk_overlap, config.text_splitter 126 ) 127 self.text_fields = config.text_fields 128 self.metadata_fields = config.metadata_fields 129 self.field_name_mappings = config.field_name_mappings 130 self.logger = logging.getLogger("airbyte.document_processor") 131 132 def process(self, record: AirbyteRecordMessage) -> Tuple[List[Chunk], Optional[str]]: 133 """ 134 Generate documents from records. 135 :param records: List of AirbyteRecordMessages 136 :return: Tuple of (List of document chunks, record id to delete if a stream is in dedup mode to avoid stale documents in the vector store) 137 """ 138 if CDC_DELETED_FIELD in record.data and record.data[CDC_DELETED_FIELD]: 139 return [], self._extract_primary_key(record) 140 doc = self._generate_document(record) 141 if doc is None: 142 text_fields = ", ".join(self.text_fields) if self.text_fields else "all fields" 143 raise AirbyteTracedException( 144 internal_message="No text fields found in record", 145 message=f"Record {str(record.data)[:250]}... does not contain any of the configured text fields: {text_fields}. Please check your processing configuration, there has to be at least one text field set in each record.", 146 failure_type=FailureType.config_error, 147 ) 148 chunks = [ 149 Chunk( 150 page_content=chunk_document.page_content, 151 metadata=chunk_document.metadata, 152 record=record, 153 ) 154 for chunk_document in self._split_document(doc) 155 ] 156 id_to_delete = ( 157 doc.metadata[METADATA_RECORD_ID_FIELD] 158 if METADATA_RECORD_ID_FIELD in doc.metadata 159 else None 160 ) 161 return chunks, id_to_delete 162 163 def _generate_document(self, record: AirbyteRecordMessage) -> Optional[Document]: 164 relevant_fields = self._extract_relevant_fields(record, self.text_fields) 165 if len(relevant_fields) == 0: 166 return None 167 text = stringify_dict(relevant_fields) 168 metadata = self._extract_metadata(record) 169 return Document(page_content=text, metadata=metadata) 170 171 def _extract_relevant_fields( 172 self, record: AirbyteRecordMessage, fields: Optional[List[str]] 173 ) -> Dict[str, Any]: 174 relevant_fields = {} 175 if fields and len(fields) > 0: 176 for field in fields: 177 values = dpath.values(record.data, field, separator=".") 178 if values and len(values) > 0: 179 relevant_fields[field] = values if len(values) > 1 else values[0] 180 else: 181 relevant_fields = record.data 182 return self._remap_field_names(relevant_fields) 183 184 def _extract_metadata(self, record: AirbyteRecordMessage) -> Dict[str, Any]: 185 metadata = self._extract_relevant_fields(record, self.metadata_fields) 186 metadata[METADATA_STREAM_FIELD] = create_stream_identifier(record) 187 primary_key = self._extract_primary_key(record) 188 if primary_key: 189 metadata[METADATA_RECORD_ID_FIELD] = primary_key 190 return metadata 191 192 def _extract_primary_key(self, record: AirbyteRecordMessage) -> Optional[str]: 193 stream_identifier = create_stream_identifier(record) 194 current_stream: ConfiguredAirbyteStream = self.streams[stream_identifier] 195 # if the sync mode is deduping, use the primary key to upsert existing records instead of appending new ones 196 if ( 197 not current_stream.primary_key 198 or current_stream.destination_sync_mode != DestinationSyncMode.append_dedup 199 ): 200 return None 201 202 primary_key = [] 203 for key in current_stream.primary_key: 204 try: 205 primary_key.append(str(dpath.get(record.data, key))) 206 except KeyError: 207 primary_key.append("__not_found__") 208 stringified_primary_key = "_".join(primary_key) 209 return f"{stream_identifier}_{stringified_primary_key}" 210 211 def _split_document(self, doc: Document) -> List[Document]: 212 chunks: List[Document] = self.splitter.split_documents([doc]) 213 return chunks 214 215 def _remap_field_names(self, fields: Dict[str, Any]) -> Dict[str, Any]: 216 if not self.field_name_mappings: 217 return fields 218 219 new_fields = fields.copy() 220 for mapping in self.field_name_mappings: 221 if mapping.from_field in new_fields: 222 new_fields[mapping.to_field] = new_fields.pop(mapping.from_field) 223 224 return new_fields
DocumentProcessor is a helper class that generates documents from Airbyte records.
It is used to generate documents from records before writing them to the destination:
- The text fields are extracted from the record and concatenated to a single string.
- The metadata fields are extracted from the record and added to the document metadata.
- The document is split into chunks of a given size using a langchain text splitter.
The Writer class uses the DocumentProcessor class to internally generate documents from records - in most cases you don't need to use it directly, except if you want to implement a custom writer.
The config parameters specified by the ProcessingConfigModel has to be made part of the connector spec to allow the user to configure the document processor. Calling DocumentProcessor.check_config(config) will validate the config and return an error message if the config is invalid.
119 def __init__(self, config: ProcessingConfigModel, catalog: ConfiguredAirbyteCatalog): 120 self.streams = { 121 create_stream_identifier(stream.stream): stream for stream in catalog.streams 122 } 123 124 self.splitter = self._get_text_splitter( 125 config.chunk_size, config.chunk_overlap, config.text_splitter 126 ) 127 self.text_fields = config.text_fields 128 self.metadata_fields = config.metadata_fields 129 self.field_name_mappings = config.field_name_mappings 130 self.logger = logging.getLogger("airbyte.document_processor")
72 @staticmethod 73 def check_config(config: ProcessingConfigModel) -> Optional[str]: 74 if config.text_splitter is not None and config.text_splitter.mode == "separator": 75 for s in config.text_splitter.separators: 76 try: 77 separator = json.loads(s) 78 if not isinstance(separator, str): 79 return f"Invalid separator: {s}. Separator needs to be a valid JSON string using double quotes." 80 except json.decoder.JSONDecodeError: 81 return f"Invalid separator: {s}. Separator needs to be a valid JSON string using double quotes." 82 return None
132 def process(self, record: AirbyteRecordMessage) -> Tuple[List[Chunk], Optional[str]]: 133 """ 134 Generate documents from records. 135 :param records: List of AirbyteRecordMessages 136 :return: Tuple of (List of document chunks, record id to delete if a stream is in dedup mode to avoid stale documents in the vector store) 137 """ 138 if CDC_DELETED_FIELD in record.data and record.data[CDC_DELETED_FIELD]: 139 return [], self._extract_primary_key(record) 140 doc = self._generate_document(record) 141 if doc is None: 142 text_fields = ", ".join(self.text_fields) if self.text_fields else "all fields" 143 raise AirbyteTracedException( 144 internal_message="No text fields found in record", 145 message=f"Record {str(record.data)[:250]}... does not contain any of the configured text fields: {text_fields}. Please check your processing configuration, there has to be at least one text field set in each record.", 146 failure_type=FailureType.config_error, 147 ) 148 chunks = [ 149 Chunk( 150 page_content=chunk_document.page_content, 151 metadata=chunk_document.metadata, 152 record=record, 153 ) 154 for chunk_document in self._split_document(doc) 155 ] 156 id_to_delete = ( 157 doc.metadata[METADATA_RECORD_ID_FIELD] 158 if METADATA_RECORD_ID_FIELD in doc.metadata 159 else None 160 ) 161 return chunks, id_to_delete
Generate documents from records.
Parameters
- records: List of AirbyteRecordMessages
Returns
Tuple of (List of document chunks, record id to delete if a stream is in dedup mode to avoid stale documents in the vector store)
36class Embedder(ABC): 37 """ 38 Embedder is an abstract class that defines the interface for embedding text. 39 40 The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. 41 The destination connector is responsible to create an embedder instance and pass it to the writer. 42 The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed. 43 """ 44 45 def __init__(self) -> None: 46 pass 47 48 @abstractmethod 49 def check(self) -> Optional[str]: 50 pass 51 52 @abstractmethod 53 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 54 """ 55 Embed the text of each chunk and return the resulting embedding vectors. 56 If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk. 57 """ 58 pass 59 60 @property 61 @abstractmethod 62 def embedding_dimensions(self) -> int: 63 pass
Embedder is an abstract class that defines the interface for embedding text.
The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
52 @abstractmethod 53 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 54 """ 55 Embed the text of each chunk and return the resulting embedding vectors. 56 If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk. 57 """ 58 pass
Embed the text of each chunk and return the resulting embedding vectors. If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.
166class FakeEmbedder(Embedder): 167 def __init__(self, config: FakeEmbeddingConfigModel): 168 super().__init__() 169 self.embeddings = FakeEmbeddings(size=OPEN_AI_VECTOR_SIZE) 170 171 def check(self) -> Optional[str]: 172 try: 173 self.embeddings.embed_query("test") 174 except Exception as e: 175 return format_exception(e) 176 return None 177 178 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 179 return cast( 180 List[Optional[List[float]]], 181 self.embeddings.embed_documents([document.page_content for document in documents]), 182 ) 183 184 @property 185 def embedding_dimensions(self) -> int: 186 # use same vector size as for OpenAI embeddings to keep it realistic 187 return OPEN_AI_VECTOR_SIZE
Embedder is an abstract class that defines the interface for embedding text.
The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
178 def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]: 179 return cast( 180 List[Optional[List[float]]], 181 self.embeddings.embed_documents([document.page_content for document in documents]), 182 )
Embed the text of each chunk and return the resulting embedding vectors. If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.
201class FakeEmbeddingConfigModel(BaseModel): 202 mode: Literal["fake"] = Field("fake", const=True) 203 204 class Config(OneOfOptionConfig): 205 title = "Fake" 206 description = "Use a fake embedding made out of random vectors with 1536 embedding dimensions. This is useful for testing the data pipeline without incurring any costs." 207 discriminator = "mode"
204 class Config(OneOfOptionConfig): 205 title = "Fake" 206 description = "Use a fake embedding made out of random vectors with 1536 embedding dimensions. This is useful for testing the data pipeline without incurring any costs." 207 discriminator = "mode"
Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.
Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).
Usage:
class OptionModel(BaseModel): mode: Literal["option_a"] = Field("option_a", const=True) option_a_field: str = Field(...) class Config(OneOfOptionConfig): title = "Option A" description = "Option A description" discriminator = "mode"
Inherited Members
210class FromFieldEmbeddingConfigModel(BaseModel): 211 mode: Literal["from_field"] = Field("from_field", const=True) 212 field_name: str = Field( 213 ..., 214 title="Field name", 215 description="Name of the field in the record that contains the embedding", 216 examples=["embedding", "vector"], 217 ) 218 dimensions: int = Field( 219 ..., 220 title="Embedding dimensions", 221 description="The number of dimensions the embedding model is generating", 222 examples=[1536, 384], 223 ) 224 225 class Config(OneOfOptionConfig): 226 title = "From Field" 227 description = "Use a field in the record as the embedding. This is useful if you already have an embedding for your data and want to store it in the vector store." 228 discriminator = "mode"
225 class Config(OneOfOptionConfig): 226 title = "From Field" 227 description = "Use a field in the record as the embedding. This is useful if you already have an embedding for your data and want to store it in the vector store." 228 discriminator = "mode"
Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.
Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).
Usage:
class OptionModel(BaseModel): mode: Literal["option_a"] = Field("option_a", const=True) option_a_field: str = Field(...) class Config(OneOfOptionConfig): title = "Option A" description = "Option A description" discriminator = "mode"
Inherited Members
14class Indexer(ABC): 15 """ 16 Indexer is an abstract class that defines the interface for indexing documents. 17 18 The Writer class uses the Indexer class to internally index documents generated by the document processor. 19 In a destination connector, implement a custom indexer by extending this class and implementing the abstract methods. 20 """ 21 22 def __init__(self, config: Any): 23 self.config = config 24 pass 25 26 def pre_sync(self, catalog: ConfiguredAirbyteCatalog) -> None: 27 """ 28 Run before the sync starts. This method should be used to make sure all records in the destination that belong to streams with a destination mode of overwrite are deleted. 29 30 Each record has a metadata field with the name airbyte_cdk.destinations.vector_db_based.document_processor.METADATA_STREAM_FIELD which can be used to filter documents for deletion. 31 Use the airbyte_cdk.destinations.vector_db_based.utils.create_stream_identifier method to create the stream identifier based on the stream definition to use for filtering. 32 """ 33 pass 34 35 def post_sync(self) -> List[AirbyteMessage]: 36 """ 37 Run after the sync finishes. This method should be used to perform any cleanup operations and can return a list of AirbyteMessages to be logged. 38 """ 39 return [] 40 41 @abstractmethod 42 def index(self, document_chunks: List[Chunk], namespace: str, stream: str) -> None: 43 """ 44 Index a list of document chunks. 45 46 This method should be used to index the documents in the destination. If page_content is None, the document should be indexed without the raw text. 47 All chunks belong to the stream and namespace specified in the parameters. 48 """ 49 pass 50 51 @abstractmethod 52 def delete(self, delete_ids: List[str], namespace: str, stream: str) -> None: 53 """ 54 Delete document chunks belonging to certain record ids. 55 56 This method should be used to delete documents from the destination. 57 The delete_ids parameter contains a list of record ids - all chunks with a record id in this list should be deleted from the destination. 58 All ids belong to the stream and namespace specified in the parameters. 59 """ 60 pass 61 62 @abstractmethod 63 def check(self) -> Optional[str]: 64 """ 65 Check if the indexer is configured correctly. This method should be used to check if the indexer is configured correctly and return an error message if it is not. 66 """ 67 pass
Indexer is an abstract class that defines the interface for indexing documents.
The Writer class uses the Indexer class to internally index documents generated by the document processor. In a destination connector, implement a custom indexer by extending this class and implementing the abstract methods.
26 def pre_sync(self, catalog: ConfiguredAirbyteCatalog) -> None: 27 """ 28 Run before the sync starts. This method should be used to make sure all records in the destination that belong to streams with a destination mode of overwrite are deleted. 29 30 Each record has a metadata field with the name airbyte_cdk.destinations.vector_db_based.document_processor.METADATA_STREAM_FIELD which can be used to filter documents for deletion. 31 Use the airbyte_cdk.destinations.vector_db_based.utils.create_stream_identifier method to create the stream identifier based on the stream definition to use for filtering. 32 """ 33 pass
Run before the sync starts. This method should be used to make sure all records in the destination that belong to streams with a destination mode of overwrite are deleted.
Each record has a metadata field with the name airbyte_cdk.destinations.vector_db_based.document_processor.METADATA_STREAM_FIELD which can be used to filter documents for deletion. Use the airbyte_cdk.destinations.vector_db_based.utils.create_stream_identifier method to create the stream identifier based on the stream definition to use for filtering.
35 def post_sync(self) -> List[AirbyteMessage]: 36 """ 37 Run after the sync finishes. This method should be used to perform any cleanup operations and can return a list of AirbyteMessages to be logged. 38 """ 39 return []
Run after the sync finishes. This method should be used to perform any cleanup operations and can return a list of AirbyteMessages to be logged.
41 @abstractmethod 42 def index(self, document_chunks: List[Chunk], namespace: str, stream: str) -> None: 43 """ 44 Index a list of document chunks. 45 46 This method should be used to index the documents in the destination. If page_content is None, the document should be indexed without the raw text. 47 All chunks belong to the stream and namespace specified in the parameters. 48 """ 49 pass
Index a list of document chunks.
This method should be used to index the documents in the destination. If page_content is None, the document should be indexed without the raw text. All chunks belong to the stream and namespace specified in the parameters.
51 @abstractmethod 52 def delete(self, delete_ids: List[str], namespace: str, stream: str) -> None: 53 """ 54 Delete document chunks belonging to certain record ids. 55 56 This method should be used to delete documents from the destination. 57 The delete_ids parameter contains a list of record ids - all chunks with a record id in this list should be deleted from the destination. 58 All ids belong to the stream and namespace specified in the parameters. 59 """ 60 pass
Delete document chunks belonging to certain record ids.
This method should be used to delete documents from the destination. The delete_ids parameter contains a list of record ids - all chunks with a record id in this list should be deleted from the destination. All ids belong to the stream and namespace specified in the parameters.
62 @abstractmethod 63 def check(self) -> Optional[str]: 64 """ 65 Check if the indexer is configured correctly. This method should be used to check if the indexer is configured correctly and return an error message if it is not. 66 """ 67 pass
Check if the indexer is configured correctly. This method should be used to check if the indexer is configured correctly and return an error message if it is not.
147class OpenAICompatibleEmbeddingConfigModel(BaseModel): 148 mode: Literal["openai_compatible"] = Field("openai_compatible", const=True) 149 api_key: str = Field(title="API key", default="", airbyte_secret=True) 150 base_url: str = Field( 151 ..., 152 title="Base URL", 153 description="The base URL for your OpenAI-compatible service", 154 examples=["https://your-service-name.com"], 155 ) 156 model_name: str = Field( 157 title="Model name", 158 description="The name of the model to use for embedding", 159 default="text-embedding-ada-002", 160 examples=["text-embedding-ada-002"], 161 ) 162 dimensions: int = Field( 163 title="Embedding dimensions", 164 description="The number of dimensions the embedding model is generating", 165 examples=[1536, 384], 166 ) 167 168 class Config(OneOfOptionConfig): 169 title = "OpenAI-compatible" 170 description = "Use a service that's compatible with the OpenAI API to embed text." 171 discriminator = "mode"
168 class Config(OneOfOptionConfig): 169 title = "OpenAI-compatible" 170 description = "Use a service that's compatible with the OpenAI API to embed text." 171 discriminator = "mode"
Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.
Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).
Usage:
class OptionModel(BaseModel): mode: Literal["option_a"] = Field("option_a", const=True) option_a_field: str = Field(...) class Config(OneOfOptionConfig): title = "Option A" description = "Option A description" discriminator = "mode"
Inherited Members
108class OpenAIEmbedder(BaseOpenAIEmbedder): 109 def __init__(self, config: OpenAIEmbeddingConfigModel, chunk_size: int): 110 super().__init__( 111 OpenAIEmbeddings( # type: ignore [call-arg] 112 openai_api_key=config.openai_key, max_retries=15, disallowed_special=() 113 ), 114 chunk_size, 115 ) # type: ignore
Embedder is an abstract class that defines the interface for embedding text.
The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
137class OpenAIEmbeddingConfigModel(BaseModel): 138 mode: Literal["openai"] = Field("openai", const=True) 139 openai_key: str = Field(..., title="OpenAI API key", airbyte_secret=True) 140 141 class Config(OneOfOptionConfig): 142 title = "OpenAI" 143 description = "Use the OpenAI API to embed text. This option is using the text-embedding-ada-002 model with 1536 embedding dimensions." 144 discriminator = "mode"
141 class Config(OneOfOptionConfig): 142 title = "OpenAI" 143 description = "Use the OpenAI API to embed text. This option is using the text-embedding-ada-002 model with 1536 embedding dimensions." 144 discriminator = "mode"
Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.
Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).
Usage:
class OptionModel(BaseModel): mode: Literal["option_a"] = Field("option_a", const=True) option_a_field: str = Field(...) class Config(OneOfOptionConfig): title = "Option A" description = "Option A description" discriminator = "mode"
Inherited Members
93class ProcessingConfigModel(BaseModel): 94 chunk_size: int = Field( 95 ..., 96 title="Chunk size", 97 maximum=8191, 98 minimum=1, 99 description="Size of chunks in tokens to store in vector store (make sure it is not too big for the context if your LLM)", 100 ) 101 chunk_overlap: int = Field( 102 title="Chunk overlap", 103 description="Size of overlap between chunks in tokens to store in vector store to better capture relevant context", 104 default=0, 105 ) 106 text_fields: Optional[List[str]] = Field( 107 default=[], 108 title="Text fields to embed", 109 description="List of fields in the record that should be used to calculate the embedding. The field list is applied to all streams in the same way and non-existing fields are ignored. If none are defined, all fields are considered text fields. When specifying text fields, you can access nested fields in the record by using dot notation, e.g. `user.name` will access the `name` field in the `user` object. It's also possible to use wildcards to access all fields in an object, e.g. `users.*.name` will access all `names` fields in all entries of the `users` array.", 110 always_show=True, 111 examples=["text", "user.name", "users.*.name"], 112 ) 113 metadata_fields: Optional[List[str]] = Field( 114 default=[], 115 title="Fields to store as metadata", 116 description="List of fields in the record that should be stored as metadata. The field list is applied to all streams in the same way and non-existing fields are ignored. If none are defined, all fields are considered metadata fields. When specifying text fields, you can access nested fields in the record by using dot notation, e.g. `user.name` will access the `name` field in the `user` object. It's also possible to use wildcards to access all fields in an object, e.g. `users.*.name` will access all `names` fields in all entries of the `users` array. When specifying nested paths, all matching values are flattened into an array set to a field named by the path.", 117 always_show=True, 118 examples=["age", "user", "user.name"], 119 ) 120 text_splitter: TextSplitterConfigModel = Field( 121 default=None, 122 title="Text splitter", 123 discriminator="mode", 124 type="object", 125 description="Split text fields into chunks based on the specified method.", 126 ) 127 field_name_mappings: Optional[List[FieldNameMappingConfigModel]] = Field( 128 default=[], 129 title="Field name mappings", 130 description="List of fields to rename. Not applicable for nested fields, but can be used to rename fields already flattened via dot notation.", 131 ) 132 133 class Config: 134 schema_extra = {"group": "processing"}
17class Writer: 18 """ 19 The Writer class is orchestrating the document processor, the embedder and the indexer: 20 * Incoming records are passed through the document processor to generate chunks 21 * One the configured batch size is reached, the chunks are passed to the embedder to generate embeddings 22 * The embedder embeds the chunks 23 * The indexer deletes old chunks by the associated record id before indexing the new ones 24 25 The destination connector is responsible to create a writer instance and pass the input messages iterable to the write method. 26 The batch size can be configured by the destination connector to give the freedom of either letting the user configure it or hardcoding it to a sensible value depending on the destination. 27 The omit_raw_text parameter can be used to omit the raw text from the chunks. This can be useful if the raw text is very large and not needed for the destination. 28 """ 29 30 def __init__( 31 self, 32 processing_config: ProcessingConfigModel, 33 indexer: Indexer, 34 embedder: Embedder, 35 batch_size: int, 36 omit_raw_text: bool, 37 ) -> None: 38 self.processing_config = processing_config 39 self.indexer = indexer 40 self.embedder = embedder 41 self.batch_size = batch_size 42 self.omit_raw_text = omit_raw_text 43 self._init_batch() 44 45 def _init_batch(self) -> None: 46 self.chunks: Dict[Tuple[str, str], List[Chunk]] = defaultdict(list) 47 self.ids_to_delete: Dict[Tuple[str, str], List[str]] = defaultdict(list) 48 self.number_of_chunks = 0 49 50 def _convert_to_document(self, chunk: Chunk) -> Document: 51 """ 52 Convert a chunk to a document for the embedder. 53 """ 54 if chunk.page_content is None: 55 raise ValueError("Cannot embed a chunk without page content") 56 return Document(page_content=chunk.page_content, record=chunk.record) 57 58 def _process_batch(self) -> None: 59 for (namespace, stream), ids in self.ids_to_delete.items(): 60 self.indexer.delete(ids, namespace, stream) 61 62 for (namespace, stream), chunks in self.chunks.items(): 63 embeddings = self.embedder.embed_documents( 64 [self._convert_to_document(chunk) for chunk in chunks] 65 ) 66 for i, document in enumerate(chunks): 67 document.embedding = embeddings[i] 68 if self.omit_raw_text: 69 document.page_content = None 70 self.indexer.index(chunks, namespace, stream) 71 72 self._init_batch() 73 74 def write( 75 self, configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage] 76 ) -> Iterable[AirbyteMessage]: 77 self.processor = DocumentProcessor(self.processing_config, configured_catalog) 78 self.indexer.pre_sync(configured_catalog) 79 for message in input_messages: 80 if message.type == Type.STATE: 81 # Emitting a state message indicates that all records which came before it have been written to the destination. So we flush 82 # the queue to ensure writes happen, then output the state message to indicate it's safe to checkpoint state 83 self._process_batch() 84 yield message 85 elif message.type == Type.RECORD: 86 record_chunks, record_id_to_delete = self.processor.process(message.record) 87 self.chunks[ 88 ( # type: ignore [index] # expected "tuple[str, str]", got "tuple[str | Any | None, str | Any]" 89 message.record.namespace, # type: ignore [union-attr] # record not None 90 message.record.stream, # type: ignore [union-attr] # record not None 91 ) 92 ].extend(record_chunks) 93 if record_id_to_delete is not None: 94 self.ids_to_delete[ 95 ( # type: ignore [index] # expected "tuple[str, str]", got "tuple[str | Any | None, str | Any]" 96 message.record.namespace, # type: ignore [union-attr] # record not None 97 message.record.stream, # type: ignore [union-attr] # record not None 98 ) 99 ].append(record_id_to_delete) 100 self.number_of_chunks += len(record_chunks) 101 if self.number_of_chunks >= self.batch_size: 102 self._process_batch() 103 104 self._process_batch() 105 yield from self.indexer.post_sync()
The Writer class is orchestrating the document processor, the embedder and the indexer:
- Incoming records are passed through the document processor to generate chunks
- One the configured batch size is reached, the chunks are passed to the embedder to generate embeddings
- The embedder embeds the chunks
- The indexer deletes old chunks by the associated record id before indexing the new ones
The destination connector is responsible to create a writer instance and pass the input messages iterable to the write method. The batch size can be configured by the destination connector to give the freedom of either letting the user configure it or hardcoding it to a sensible value depending on the destination. The omit_raw_text parameter can be used to omit the raw text from the chunks. This can be useful if the raw text is very large and not needed for the destination.
30 def __init__( 31 self, 32 processing_config: ProcessingConfigModel, 33 indexer: Indexer, 34 embedder: Embedder, 35 batch_size: int, 36 omit_raw_text: bool, 37 ) -> None: 38 self.processing_config = processing_config 39 self.indexer = indexer 40 self.embedder = embedder 41 self.batch_size = batch_size 42 self.omit_raw_text = omit_raw_text 43 self._init_batch()
74 def write( 75 self, configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage] 76 ) -> Iterable[AirbyteMessage]: 77 self.processor = DocumentProcessor(self.processing_config, configured_catalog) 78 self.indexer.pre_sync(configured_catalog) 79 for message in input_messages: 80 if message.type == Type.STATE: 81 # Emitting a state message indicates that all records which came before it have been written to the destination. So we flush 82 # the queue to ensure writes happen, then output the state message to indicate it's safe to checkpoint state 83 self._process_batch() 84 yield message 85 elif message.type == Type.RECORD: 86 record_chunks, record_id_to_delete = self.processor.process(message.record) 87 self.chunks[ 88 ( # type: ignore [index] # expected "tuple[str, str]", got "tuple[str | Any | None, str | Any]" 89 message.record.namespace, # type: ignore [union-attr] # record not None 90 message.record.stream, # type: ignore [union-attr] # record not None 91 ) 92 ].extend(record_chunks) 93 if record_id_to_delete is not None: 94 self.ids_to_delete[ 95 ( # type: ignore [index] # expected "tuple[str, str]", got "tuple[str | Any | None, str | Any]" 96 message.record.namespace, # type: ignore [union-attr] # record not None 97 message.record.stream, # type: ignore [union-attr] # record not None 98 ) 99 ].append(record_id_to_delete) 100 self.number_of_chunks += len(record_chunks) 101 if self.number_of_chunks >= self.batch_size: 102 self._process_batch() 103 104 self._process_batch() 105 yield from self.indexer.post_sync()