airbyte_cdk.destinations.vector_db_based

 1#
 2# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
 3#
 4
 5from .config import (
 6    AzureOpenAIEmbeddingConfigModel,
 7    CohereEmbeddingConfigModel,
 8    FakeEmbeddingConfigModel,
 9    FromFieldEmbeddingConfigModel,
10    OpenAICompatibleEmbeddingConfigModel,
11    OpenAIEmbeddingConfigModel,
12    ProcessingConfigModel,
13)
14from .document_processor import Chunk, DocumentProcessor
15from .embedder import CohereEmbedder, Embedder, FakeEmbedder, OpenAIEmbedder
16from .indexer import Indexer
17from .writer import Writer
18
19__all__ = [
20    "AzureOpenAIEmbedder",
21    "AzureOpenAIEmbeddingConfigModel",
22    "Chunk",
23    "CohereEmbedder",
24    "CohereEmbeddingConfigModel",
25    "DocumentProcessor",
26    "Embedder",
27    "FakeEmbedder",
28    "FakeEmbeddingConfigModel",
29    "FromFieldEmbedder",
30    "FromFieldEmbeddingConfigModel",
31    "Indexer",
32    "OpenAICompatibleEmbedder",
33    "OpenAICompatibleEmbeddingConfigModel",
34    "OpenAIEmbedder",
35    "OpenAIEmbeddingConfigModel",
36    "ProcessingConfigModel",
37    "Writer",
38]
AzureOpenAIEmbedder
class AzureOpenAIEmbeddingConfigModel(pydantic.v1.main.BaseModel):
174class AzureOpenAIEmbeddingConfigModel(BaseModel):
175    mode: Literal["azure_openai"] = Field("azure_openai", const=True)
176    openai_key: str = Field(
177        ...,
178        title="Azure OpenAI API key",
179        airbyte_secret=True,
180        description="The API key for your Azure OpenAI resource.  You can find this in the Azure portal under your Azure OpenAI resource",
181    )
182    api_base: str = Field(
183        ...,
184        title="Resource base URL",
185        description="The base URL for your Azure OpenAI resource.  You can find this in the Azure portal under your Azure OpenAI resource",
186        examples=["https://your-resource-name.openai.azure.com"],
187    )
188    deployment: str = Field(
189        ...,
190        title="Deployment",
191        description="The deployment for your Azure OpenAI resource.  You can find this in the Azure portal under your Azure OpenAI resource",
192        examples=["your-resource-name"],
193    )
194
195    class Config(OneOfOptionConfig):
196        title = "Azure OpenAI"
197        description = "Use the Azure-hosted OpenAI API to embed text. This option is using the text-embedding-ada-002 model with 1536 embedding dimensions."
198        discriminator = "mode"
mode: Literal['azure_openai']
openai_key: str
api_base: str
deployment: str
class AzureOpenAIEmbeddingConfigModel.Config(airbyte_cdk.utils.oneof_option_config.OneOfOptionConfig):
195    class Config(OneOfOptionConfig):
196        title = "Azure OpenAI"
197        description = "Use the Azure-hosted OpenAI API to embed text. This option is using the text-embedding-ada-002 model with 1536 embedding dimensions."
198        discriminator = "mode"

Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.

Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).

Usage:
class OptionModel(BaseModel):
    mode: Literal["option_a"] = Field("option_a", const=True)
    option_a_field: str = Field(...)

    class Config(OneOfOptionConfig):
        title = "Option A"
        description = "Option A description"
        discriminator = "mode"
title = 'Azure OpenAI'
description = 'Use the Azure-hosted OpenAI API to embed text. This option is using the text-embedding-ada-002 model with 1536 embedding dimensions.'
discriminator = 'mode'
@dataclass
class Chunk:
36@dataclass
37class Chunk:
38    page_content: Optional[str]
39    metadata: Dict[str, Any]
40    record: AirbyteRecordMessage
41    embedding: Optional[List[float]] = None
Chunk( page_content: Optional[str], metadata: Dict[str, Any], record: airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteRecordMessage, embedding: Optional[List[float]] = None)
page_content: Optional[str]
metadata: Dict[str, Any]
record: airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteRecordMessage
embedding: Optional[List[float]] = None
class CohereEmbedder(airbyte_cdk.destinations.vector_db_based.Embedder):
139class CohereEmbedder(Embedder):
140    def __init__(self, config: CohereEmbeddingConfigModel):
141        super().__init__()
142        # Client is set internally
143        self.embeddings = CohereEmbeddings(
144            cohere_api_key=config.cohere_key, model="embed-english-light-v2.0"
145        )  # type: ignore
146
147    def check(self) -> Optional[str]:
148        try:
149            self.embeddings.embed_query("test")
150        except Exception as e:
151            return format_exception(e)
152        return None
153
154    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
155        return cast(
156            List[Optional[List[float]]],
157            self.embeddings.embed_documents([document.page_content for document in documents]),
158        )
159
160    @property
161    def embedding_dimensions(self) -> int:
162        # vector size produced by text-embedding-ada-002 model
163        return COHERE_VECTOR_SIZE

Embedder is an abstract class that defines the interface for embedding text.

The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.

CohereEmbedder( config: CohereEmbeddingConfigModel)
140    def __init__(self, config: CohereEmbeddingConfigModel):
141        super().__init__()
142        # Client is set internally
143        self.embeddings = CohereEmbeddings(
144            cohere_api_key=config.cohere_key, model="embed-english-light-v2.0"
145        )  # type: ignore
embeddings
def check(self) -> Optional[str]:
147    def check(self) -> Optional[str]:
148        try:
149            self.embeddings.embed_query("test")
150        except Exception as e:
151            return format_exception(e)
152        return None
def embed_documents( self, documents: List[airbyte_cdk.destinations.vector_db_based.embedder.Document]) -> List[Optional[List[float]]]:
154    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
155        return cast(
156            List[Optional[List[float]]],
157            self.embeddings.embed_documents([document.page_content for document in documents]),
158        )

Embed the text of each chunk and return the resulting embedding vectors. If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.

embedding_dimensions: int
160    @property
161    def embedding_dimensions(self) -> int:
162        # vector size produced by text-embedding-ada-002 model
163        return COHERE_VECTOR_SIZE
class CohereEmbeddingConfigModel(pydantic.v1.main.BaseModel):
231class CohereEmbeddingConfigModel(BaseModel):
232    mode: Literal["cohere"] = Field("cohere", const=True)
233    cohere_key: str = Field(..., title="Cohere API key", airbyte_secret=True)
234
235    class Config(OneOfOptionConfig):
236        title = "Cohere"
237        description = "Use the Cohere API to embed text."
238        discriminator = "mode"
mode: Literal['cohere']
cohere_key: str
class CohereEmbeddingConfigModel.Config(airbyte_cdk.utils.oneof_option_config.OneOfOptionConfig):
235    class Config(OneOfOptionConfig):
236        title = "Cohere"
237        description = "Use the Cohere API to embed text."
238        discriminator = "mode"

Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.

Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).

Usage:
class OptionModel(BaseModel):
    mode: Literal["option_a"] = Field("option_a", const=True)
    option_a_field: str = Field(...)

    class Config(OneOfOptionConfig):
        title = "Option A"
        description = "Option A description"
        discriminator = "mode"
title = 'Cohere'
description = 'Use the Cohere API to embed text.'
discriminator = 'mode'
class DocumentProcessor:
 54class DocumentProcessor:
 55    """
 56    DocumentProcessor is a helper class that generates documents from Airbyte records.
 57
 58    It is used to generate documents from records before writing them to the destination:
 59    * The text fields are extracted from the record and concatenated to a single string.
 60    * The metadata fields are extracted from the record and added to the document metadata.
 61    * The document is split into chunks of a given size using a langchain text splitter.
 62
 63    The Writer class uses the DocumentProcessor class to internally generate documents from records - in most cases you don't need to use it directly,
 64    except if you want to implement a custom writer.
 65
 66    The config parameters specified by the ProcessingConfigModel has to be made part of the connector spec to allow the user to configure the document processor.
 67    Calling DocumentProcessor.check_config(config) will validate the config and return an error message if the config is invalid.
 68    """
 69
 70    streams: Mapping[str, ConfiguredAirbyteStream]
 71
 72    @staticmethod
 73    def check_config(config: ProcessingConfigModel) -> Optional[str]:
 74        if config.text_splitter is not None and config.text_splitter.mode == "separator":
 75            for s in config.text_splitter.separators:
 76                try:
 77                    separator = json.loads(s)
 78                    if not isinstance(separator, str):
 79                        return f"Invalid separator: {s}. Separator needs to be a valid JSON string using double quotes."
 80                except json.decoder.JSONDecodeError:
 81                    return f"Invalid separator: {s}. Separator needs to be a valid JSON string using double quotes."
 82        return None
 83
 84    def _get_text_splitter(
 85        self,
 86        chunk_size: int,
 87        chunk_overlap: int,
 88        splitter_config: Optional[TextSplitterConfigModel],
 89    ) -> RecursiveCharacterTextSplitter:
 90        if splitter_config is None:
 91            splitter_config = SeparatorSplitterConfigModel(mode="separator")
 92        if splitter_config.mode == "separator":
 93            return RecursiveCharacterTextSplitter.from_tiktoken_encoder(
 94                chunk_size=chunk_size,
 95                chunk_overlap=chunk_overlap,
 96                separators=[json.loads(s) for s in splitter_config.separators],
 97                keep_separator=splitter_config.keep_separator,
 98                disallowed_special=(),
 99            )
100        if splitter_config.mode == "markdown":
101            return RecursiveCharacterTextSplitter.from_tiktoken_encoder(
102                chunk_size=chunk_size,
103                chunk_overlap=chunk_overlap,
104                separators=headers_to_split_on[: splitter_config.split_level],
105                is_separator_regex=True,
106                keep_separator=True,
107                disallowed_special=(),
108            )
109        if splitter_config.mode == "code":
110            return RecursiveCharacterTextSplitter.from_tiktoken_encoder(
111                chunk_size=chunk_size,
112                chunk_overlap=chunk_overlap,
113                separators=RecursiveCharacterTextSplitter.get_separators_for_language(
114                    Language(splitter_config.language)
115                ),
116                disallowed_special=(),
117            )
118
119    def __init__(self, config: ProcessingConfigModel, catalog: ConfiguredAirbyteCatalog):
120        self.streams = {
121            create_stream_identifier(stream.stream): stream for stream in catalog.streams
122        }
123
124        self.splitter = self._get_text_splitter(
125            config.chunk_size, config.chunk_overlap, config.text_splitter
126        )
127        self.text_fields = config.text_fields
128        self.metadata_fields = config.metadata_fields
129        self.field_name_mappings = config.field_name_mappings
130        self.logger = logging.getLogger("airbyte.document_processor")
131
132    def process(self, record: AirbyteRecordMessage) -> Tuple[List[Chunk], Optional[str]]:
133        """
134        Generate documents from records.
135        :param records: List of AirbyteRecordMessages
136        :return: Tuple of (List of document chunks, record id to delete if a stream is in dedup mode to avoid stale documents in the vector store)
137        """
138        if CDC_DELETED_FIELD in record.data and record.data[CDC_DELETED_FIELD]:
139            return [], self._extract_primary_key(record)
140        doc = self._generate_document(record)
141        if doc is None:
142            text_fields = ", ".join(self.text_fields) if self.text_fields else "all fields"
143            raise AirbyteTracedException(
144                internal_message="No text fields found in record",
145                message=f"Record {str(record.data)[:250]}... does not contain any of the configured text fields: {text_fields}. Please check your processing configuration, there has to be at least one text field set in each record.",
146                failure_type=FailureType.config_error,
147            )
148        chunks = [
149            Chunk(
150                page_content=chunk_document.page_content,
151                metadata=chunk_document.metadata,
152                record=record,
153            )
154            for chunk_document in self._split_document(doc)
155        ]
156        id_to_delete = (
157            doc.metadata[METADATA_RECORD_ID_FIELD]
158            if METADATA_RECORD_ID_FIELD in doc.metadata
159            else None
160        )
161        return chunks, id_to_delete
162
163    def _generate_document(self, record: AirbyteRecordMessage) -> Optional[Document]:
164        relevant_fields = self._extract_relevant_fields(record, self.text_fields)
165        if len(relevant_fields) == 0:
166            return None
167        text = stringify_dict(relevant_fields)
168        metadata = self._extract_metadata(record)
169        return Document(page_content=text, metadata=metadata)
170
171    def _extract_relevant_fields(
172        self, record: AirbyteRecordMessage, fields: Optional[List[str]]
173    ) -> Dict[str, Any]:
174        relevant_fields = {}
175        if fields and len(fields) > 0:
176            for field in fields:
177                values = dpath.values(record.data, field, separator=".")
178                if values and len(values) > 0:
179                    relevant_fields[field] = values if len(values) > 1 else values[0]
180        else:
181            relevant_fields = record.data
182        return self._remap_field_names(relevant_fields)
183
184    def _extract_metadata(self, record: AirbyteRecordMessage) -> Dict[str, Any]:
185        metadata = self._extract_relevant_fields(record, self.metadata_fields)
186        metadata[METADATA_STREAM_FIELD] = create_stream_identifier(record)
187        primary_key = self._extract_primary_key(record)
188        if primary_key:
189            metadata[METADATA_RECORD_ID_FIELD] = primary_key
190        return metadata
191
192    def _extract_primary_key(self, record: AirbyteRecordMessage) -> Optional[str]:
193        stream_identifier = create_stream_identifier(record)
194        current_stream: ConfiguredAirbyteStream = self.streams[stream_identifier]
195        # if the sync mode is deduping, use the primary key to upsert existing records instead of appending new ones
196        if (
197            not current_stream.primary_key
198            or current_stream.destination_sync_mode != DestinationSyncMode.append_dedup
199        ):
200            return None
201
202        primary_key = []
203        for key in current_stream.primary_key:
204            try:
205                primary_key.append(str(dpath.get(record.data, key)))
206            except KeyError:
207                primary_key.append("__not_found__")
208        stringified_primary_key = "_".join(primary_key)
209        return f"{stream_identifier}_{stringified_primary_key}"
210
211    def _split_document(self, doc: Document) -> List[Document]:
212        chunks: List[Document] = self.splitter.split_documents([doc])
213        return chunks
214
215    def _remap_field_names(self, fields: Dict[str, Any]) -> Dict[str, Any]:
216        if not self.field_name_mappings:
217            return fields
218
219        new_fields = fields.copy()
220        for mapping in self.field_name_mappings:
221            if mapping.from_field in new_fields:
222                new_fields[mapping.to_field] = new_fields.pop(mapping.from_field)
223
224        return new_fields

DocumentProcessor is a helper class that generates documents from Airbyte records.

It is used to generate documents from records before writing them to the destination:

  • The text fields are extracted from the record and concatenated to a single string.
  • The metadata fields are extracted from the record and added to the document metadata.
  • The document is split into chunks of a given size using a langchain text splitter.

The Writer class uses the DocumentProcessor class to internally generate documents from records - in most cases you don't need to use it directly, except if you want to implement a custom writer.

The config parameters specified by the ProcessingConfigModel has to be made part of the connector spec to allow the user to configure the document processor. Calling DocumentProcessor.check_config(config) will validate the config and return an error message if the config is invalid.

DocumentProcessor( config: ProcessingConfigModel, catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog)
119    def __init__(self, config: ProcessingConfigModel, catalog: ConfiguredAirbyteCatalog):
120        self.streams = {
121            create_stream_identifier(stream.stream): stream for stream in catalog.streams
122        }
123
124        self.splitter = self._get_text_splitter(
125            config.chunk_size, config.chunk_overlap, config.text_splitter
126        )
127        self.text_fields = config.text_fields
128        self.metadata_fields = config.metadata_fields
129        self.field_name_mappings = config.field_name_mappings
130        self.logger = logging.getLogger("airbyte.document_processor")
streams: Mapping[str, airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteStream]
@staticmethod
def check_config( config: ProcessingConfigModel) -> Optional[str]:
72    @staticmethod
73    def check_config(config: ProcessingConfigModel) -> Optional[str]:
74        if config.text_splitter is not None and config.text_splitter.mode == "separator":
75            for s in config.text_splitter.separators:
76                try:
77                    separator = json.loads(s)
78                    if not isinstance(separator, str):
79                        return f"Invalid separator: {s}. Separator needs to be a valid JSON string using double quotes."
80                except json.decoder.JSONDecodeError:
81                    return f"Invalid separator: {s}. Separator needs to be a valid JSON string using double quotes."
82        return None
splitter
text_fields
metadata_fields
field_name_mappings
logger
def process( self, record: airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteRecordMessage) -> Tuple[List[Chunk], Optional[str]]:
132    def process(self, record: AirbyteRecordMessage) -> Tuple[List[Chunk], Optional[str]]:
133        """
134        Generate documents from records.
135        :param records: List of AirbyteRecordMessages
136        :return: Tuple of (List of document chunks, record id to delete if a stream is in dedup mode to avoid stale documents in the vector store)
137        """
138        if CDC_DELETED_FIELD in record.data and record.data[CDC_DELETED_FIELD]:
139            return [], self._extract_primary_key(record)
140        doc = self._generate_document(record)
141        if doc is None:
142            text_fields = ", ".join(self.text_fields) if self.text_fields else "all fields"
143            raise AirbyteTracedException(
144                internal_message="No text fields found in record",
145                message=f"Record {str(record.data)[:250]}... does not contain any of the configured text fields: {text_fields}. Please check your processing configuration, there has to be at least one text field set in each record.",
146                failure_type=FailureType.config_error,
147            )
148        chunks = [
149            Chunk(
150                page_content=chunk_document.page_content,
151                metadata=chunk_document.metadata,
152                record=record,
153            )
154            for chunk_document in self._split_document(doc)
155        ]
156        id_to_delete = (
157            doc.metadata[METADATA_RECORD_ID_FIELD]
158            if METADATA_RECORD_ID_FIELD in doc.metadata
159            else None
160        )
161        return chunks, id_to_delete

Generate documents from records.

Parameters
  • records: List of AirbyteRecordMessages
Returns

Tuple of (List of document chunks, record id to delete if a stream is in dedup mode to avoid stale documents in the vector store)

class Embedder(abc.ABC):
36class Embedder(ABC):
37    """
38    Embedder is an abstract class that defines the interface for embedding text.
39
40    The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination.
41    The destination connector is responsible to create an embedder instance and pass it to the writer.
42    The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.
43    """
44
45    def __init__(self) -> None:
46        pass
47
48    @abstractmethod
49    def check(self) -> Optional[str]:
50        pass
51
52    @abstractmethod
53    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
54        """
55        Embed the text of each chunk and return the resulting embedding vectors.
56        If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.
57        """
58        pass
59
60    @property
61    @abstractmethod
62    def embedding_dimensions(self) -> int:
63        pass

Embedder is an abstract class that defines the interface for embedding text.

The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.

@abstractmethod
def check(self) -> Optional[str]:
48    @abstractmethod
49    def check(self) -> Optional[str]:
50        pass
@abstractmethod
def embed_documents( self, documents: List[airbyte_cdk.destinations.vector_db_based.embedder.Document]) -> List[Optional[List[float]]]:
52    @abstractmethod
53    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
54        """
55        Embed the text of each chunk and return the resulting embedding vectors.
56        If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.
57        """
58        pass

Embed the text of each chunk and return the resulting embedding vectors. If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.

embedding_dimensions: int
60    @property
61    @abstractmethod
62    def embedding_dimensions(self) -> int:
63        pass
class FakeEmbedder(airbyte_cdk.destinations.vector_db_based.Embedder):
166class FakeEmbedder(Embedder):
167    def __init__(self, config: FakeEmbeddingConfigModel):
168        super().__init__()
169        self.embeddings = FakeEmbeddings(size=OPEN_AI_VECTOR_SIZE)
170
171    def check(self) -> Optional[str]:
172        try:
173            self.embeddings.embed_query("test")
174        except Exception as e:
175            return format_exception(e)
176        return None
177
178    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
179        return cast(
180            List[Optional[List[float]]],
181            self.embeddings.embed_documents([document.page_content for document in documents]),
182        )
183
184    @property
185    def embedding_dimensions(self) -> int:
186        # use same vector size as for OpenAI embeddings to keep it realistic
187        return OPEN_AI_VECTOR_SIZE

Embedder is an abstract class that defines the interface for embedding text.

The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.

FakeEmbedder( config: FakeEmbeddingConfigModel)
167    def __init__(self, config: FakeEmbeddingConfigModel):
168        super().__init__()
169        self.embeddings = FakeEmbeddings(size=OPEN_AI_VECTOR_SIZE)
embeddings
def check(self) -> Optional[str]:
171    def check(self) -> Optional[str]:
172        try:
173            self.embeddings.embed_query("test")
174        except Exception as e:
175            return format_exception(e)
176        return None
def embed_documents( self, documents: List[airbyte_cdk.destinations.vector_db_based.embedder.Document]) -> List[Optional[List[float]]]:
178    def embed_documents(self, documents: List[Document]) -> List[Optional[List[float]]]:
179        return cast(
180            List[Optional[List[float]]],
181            self.embeddings.embed_documents([document.page_content for document in documents]),
182        )

Embed the text of each chunk and return the resulting embedding vectors. If a chunk cannot be embedded or is configured to not be embedded, return None for that chunk.

embedding_dimensions: int
184    @property
185    def embedding_dimensions(self) -> int:
186        # use same vector size as for OpenAI embeddings to keep it realistic
187        return OPEN_AI_VECTOR_SIZE
class FakeEmbeddingConfigModel(pydantic.v1.main.BaseModel):
201class FakeEmbeddingConfigModel(BaseModel):
202    mode: Literal["fake"] = Field("fake", const=True)
203
204    class Config(OneOfOptionConfig):
205        title = "Fake"
206        description = "Use a fake embedding made out of random vectors with 1536 embedding dimensions. This is useful for testing the data pipeline without incurring any costs."
207        discriminator = "mode"
mode: Literal['fake']
class FakeEmbeddingConfigModel.Config(airbyte_cdk.utils.oneof_option_config.OneOfOptionConfig):
204    class Config(OneOfOptionConfig):
205        title = "Fake"
206        description = "Use a fake embedding made out of random vectors with 1536 embedding dimensions. This is useful for testing the data pipeline without incurring any costs."
207        discriminator = "mode"

Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.

Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).

Usage:
class OptionModel(BaseModel):
    mode: Literal["option_a"] = Field("option_a", const=True)
    option_a_field: str = Field(...)

    class Config(OneOfOptionConfig):
        title = "Option A"
        description = "Option A description"
        discriminator = "mode"
title = 'Fake'
description = 'Use a fake embedding made out of random vectors with 1536 embedding dimensions. This is useful for testing the data pipeline without incurring any costs.'
discriminator = 'mode'
FromFieldEmbedder
class FromFieldEmbeddingConfigModel(pydantic.v1.main.BaseModel):
210class FromFieldEmbeddingConfigModel(BaseModel):
211    mode: Literal["from_field"] = Field("from_field", const=True)
212    field_name: str = Field(
213        ...,
214        title="Field name",
215        description="Name of the field in the record that contains the embedding",
216        examples=["embedding", "vector"],
217    )
218    dimensions: int = Field(
219        ...,
220        title="Embedding dimensions",
221        description="The number of dimensions the embedding model is generating",
222        examples=[1536, 384],
223    )
224
225    class Config(OneOfOptionConfig):
226        title = "From Field"
227        description = "Use a field in the record as the embedding. This is useful if you already have an embedding for your data and want to store it in the vector store."
228        discriminator = "mode"
mode: Literal['from_field']
field_name: str
dimensions: int
class FromFieldEmbeddingConfigModel.Config(airbyte_cdk.utils.oneof_option_config.OneOfOptionConfig):
225    class Config(OneOfOptionConfig):
226        title = "From Field"
227        description = "Use a field in the record as the embedding. This is useful if you already have an embedding for your data and want to store it in the vector store."
228        discriminator = "mode"

Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.

Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).

Usage:
class OptionModel(BaseModel):
    mode: Literal["option_a"] = Field("option_a", const=True)
    option_a_field: str = Field(...)

    class Config(OneOfOptionConfig):
        title = "Option A"
        description = "Option A description"
        discriminator = "mode"
title = 'From Field'
description = 'Use a field in the record as the embedding. This is useful if you already have an embedding for your data and want to store it in the vector store.'
discriminator = 'mode'
class Indexer(abc.ABC):
14class Indexer(ABC):
15    """
16    Indexer is an abstract class that defines the interface for indexing documents.
17
18    The Writer class uses the Indexer class to internally index documents generated by the document processor.
19    In a destination connector, implement a custom indexer by extending this class and implementing the abstract methods.
20    """
21
22    def __init__(self, config: Any):
23        self.config = config
24        pass
25
26    def pre_sync(self, catalog: ConfiguredAirbyteCatalog) -> None:
27        """
28        Run before the sync starts. This method should be used to make sure all records in the destination that belong to streams with a destination mode of overwrite are deleted.
29
30        Each record has a metadata field with the name airbyte_cdk.destinations.vector_db_based.document_processor.METADATA_STREAM_FIELD which can be used to filter documents for deletion.
31        Use the airbyte_cdk.destinations.vector_db_based.utils.create_stream_identifier method to create the stream identifier based on the stream definition to use for filtering.
32        """
33        pass
34
35    def post_sync(self) -> List[AirbyteMessage]:
36        """
37        Run after the sync finishes. This method should be used to perform any cleanup operations and can return a list of AirbyteMessages to be logged.
38        """
39        return []
40
41    @abstractmethod
42    def index(self, document_chunks: List[Chunk], namespace: str, stream: str) -> None:
43        """
44        Index a list of document chunks.
45
46        This method should be used to index the documents in the destination. If page_content is None, the document should be indexed without the raw text.
47        All chunks belong to the stream and namespace specified in the parameters.
48        """
49        pass
50
51    @abstractmethod
52    def delete(self, delete_ids: List[str], namespace: str, stream: str) -> None:
53        """
54        Delete document chunks belonging to certain record ids.
55
56        This method should be used to delete documents from the destination.
57        The delete_ids parameter contains a list of record ids - all chunks with a record id in this list should be deleted from the destination.
58        All ids belong to the stream and namespace specified in the parameters.
59        """
60        pass
61
62    @abstractmethod
63    def check(self) -> Optional[str]:
64        """
65        Check if the indexer is configured correctly. This method should be used to check if the indexer is configured correctly and return an error message if it is not.
66        """
67        pass

Indexer is an abstract class that defines the interface for indexing documents.

The Writer class uses the Indexer class to internally index documents generated by the document processor. In a destination connector, implement a custom indexer by extending this class and implementing the abstract methods.

config
def pre_sync( self, catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog) -> None:
26    def pre_sync(self, catalog: ConfiguredAirbyteCatalog) -> None:
27        """
28        Run before the sync starts. This method should be used to make sure all records in the destination that belong to streams with a destination mode of overwrite are deleted.
29
30        Each record has a metadata field with the name airbyte_cdk.destinations.vector_db_based.document_processor.METADATA_STREAM_FIELD which can be used to filter documents for deletion.
31        Use the airbyte_cdk.destinations.vector_db_based.utils.create_stream_identifier method to create the stream identifier based on the stream definition to use for filtering.
32        """
33        pass

Run before the sync starts. This method should be used to make sure all records in the destination that belong to streams with a destination mode of overwrite are deleted.

Each record has a metadata field with the name airbyte_cdk.destinations.vector_db_based.document_processor.METADATA_STREAM_FIELD which can be used to filter documents for deletion. Use the airbyte_cdk.destinations.vector_db_based.utils.create_stream_identifier method to create the stream identifier based on the stream definition to use for filtering.

def post_sync(self) -> List[airbyte_cdk.AirbyteMessage]:
35    def post_sync(self) -> List[AirbyteMessage]:
36        """
37        Run after the sync finishes. This method should be used to perform any cleanup operations and can return a list of AirbyteMessages to be logged.
38        """
39        return []

Run after the sync finishes. This method should be used to perform any cleanup operations and can return a list of AirbyteMessages to be logged.

@abstractmethod
def index( self, document_chunks: List[Chunk], namespace: str, stream: str) -> None:
41    @abstractmethod
42    def index(self, document_chunks: List[Chunk], namespace: str, stream: str) -> None:
43        """
44        Index a list of document chunks.
45
46        This method should be used to index the documents in the destination. If page_content is None, the document should be indexed without the raw text.
47        All chunks belong to the stream and namespace specified in the parameters.
48        """
49        pass

Index a list of document chunks.

This method should be used to index the documents in the destination. If page_content is None, the document should be indexed without the raw text. All chunks belong to the stream and namespace specified in the parameters.

@abstractmethod
def delete(self, delete_ids: List[str], namespace: str, stream: str) -> None:
51    @abstractmethod
52    def delete(self, delete_ids: List[str], namespace: str, stream: str) -> None:
53        """
54        Delete document chunks belonging to certain record ids.
55
56        This method should be used to delete documents from the destination.
57        The delete_ids parameter contains a list of record ids - all chunks with a record id in this list should be deleted from the destination.
58        All ids belong to the stream and namespace specified in the parameters.
59        """
60        pass

Delete document chunks belonging to certain record ids.

This method should be used to delete documents from the destination. The delete_ids parameter contains a list of record ids - all chunks with a record id in this list should be deleted from the destination. All ids belong to the stream and namespace specified in the parameters.

@abstractmethod
def check(self) -> Optional[str]:
62    @abstractmethod
63    def check(self) -> Optional[str]:
64        """
65        Check if the indexer is configured correctly. This method should be used to check if the indexer is configured correctly and return an error message if it is not.
66        """
67        pass

Check if the indexer is configured correctly. This method should be used to check if the indexer is configured correctly and return an error message if it is not.

OpenAICompatibleEmbedder
class OpenAICompatibleEmbeddingConfigModel(pydantic.v1.main.BaseModel):
147class OpenAICompatibleEmbeddingConfigModel(BaseModel):
148    mode: Literal["openai_compatible"] = Field("openai_compatible", const=True)
149    api_key: str = Field(title="API key", default="", airbyte_secret=True)
150    base_url: str = Field(
151        ...,
152        title="Base URL",
153        description="The base URL for your OpenAI-compatible service",
154        examples=["https://your-service-name.com"],
155    )
156    model_name: str = Field(
157        title="Model name",
158        description="The name of the model to use for embedding",
159        default="text-embedding-ada-002",
160        examples=["text-embedding-ada-002"],
161    )
162    dimensions: int = Field(
163        title="Embedding dimensions",
164        description="The number of dimensions the embedding model is generating",
165        examples=[1536, 384],
166    )
167
168    class Config(OneOfOptionConfig):
169        title = "OpenAI-compatible"
170        description = "Use a service that's compatible with the OpenAI API to embed text."
171        discriminator = "mode"
mode: Literal['openai_compatible']
api_key: str
base_url: str
model_name: str
dimensions: int
class OpenAICompatibleEmbeddingConfigModel.Config(airbyte_cdk.utils.oneof_option_config.OneOfOptionConfig):
168    class Config(OneOfOptionConfig):
169        title = "OpenAI-compatible"
170        description = "Use a service that's compatible with the OpenAI API to embed text."
171        discriminator = "mode"

Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.

Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).

Usage:
class OptionModel(BaseModel):
    mode: Literal["option_a"] = Field("option_a", const=True)
    option_a_field: str = Field(...)

    class Config(OneOfOptionConfig):
        title = "Option A"
        description = "Option A description"
        discriminator = "mode"
title = 'OpenAI-compatible'
description = "Use a service that's compatible with the OpenAI API to embed text."
discriminator = 'mode'
108class OpenAIEmbedder(BaseOpenAIEmbedder):
109    def __init__(self, config: OpenAIEmbeddingConfigModel, chunk_size: int):
110        super().__init__(
111            OpenAIEmbeddings(  # type: ignore [call-arg]
112                openai_api_key=config.openai_key, max_retries=15, disallowed_special=()
113            ),
114            chunk_size,
115        )  # type: ignore

Embedder is an abstract class that defines the interface for embedding text.

The Indexer class uses the Embedder class to internally embed text - each indexer is responsible to pass the text of all documents to the embedder and store the resulting embeddings in the destination. The destination connector is responsible to create an embedder instance and pass it to the writer. The CDK defines basic embedders that should be supported in each destination. It is possible to implement custom embedders for special destinations if needed.

OpenAIEmbedder( config: OpenAIEmbeddingConfigModel, chunk_size: int)
109    def __init__(self, config: OpenAIEmbeddingConfigModel, chunk_size: int):
110        super().__init__(
111            OpenAIEmbeddings(  # type: ignore [call-arg]
112                openai_api_key=config.openai_key, max_retries=15, disallowed_special=()
113            ),
114            chunk_size,
115        )  # type: ignore
class OpenAIEmbeddingConfigModel(pydantic.v1.main.BaseModel):
137class OpenAIEmbeddingConfigModel(BaseModel):
138    mode: Literal["openai"] = Field("openai", const=True)
139    openai_key: str = Field(..., title="OpenAI API key", airbyte_secret=True)
140
141    class Config(OneOfOptionConfig):
142        title = "OpenAI"
143        description = "Use the OpenAI API to embed text. This option is using the text-embedding-ada-002 model with 1536 embedding dimensions."
144        discriminator = "mode"
mode: Literal['openai']
openai_key: str
class OpenAIEmbeddingConfigModel.Config(airbyte_cdk.utils.oneof_option_config.OneOfOptionConfig):
141    class Config(OneOfOptionConfig):
142        title = "OpenAI"
143        description = "Use the OpenAI API to embed text. This option is using the text-embedding-ada-002 model with 1536 embedding dimensions."
144        discriminator = "mode"

Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.

Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).

Usage:
class OptionModel(BaseModel):
    mode: Literal["option_a"] = Field("option_a", const=True)
    option_a_field: str = Field(...)

    class Config(OneOfOptionConfig):
        title = "Option A"
        description = "Option A description"
        discriminator = "mode"
title = 'OpenAI'
description = 'Use the OpenAI API to embed text. This option is using the text-embedding-ada-002 model with 1536 embedding dimensions.'
discriminator = 'mode'
class ProcessingConfigModel(pydantic.v1.main.BaseModel):
 93class ProcessingConfigModel(BaseModel):
 94    chunk_size: int = Field(
 95        ...,
 96        title="Chunk size",
 97        maximum=8191,
 98        minimum=1,
 99        description="Size of chunks in tokens to store in vector store (make sure it is not too big for the context if your LLM)",
100    )
101    chunk_overlap: int = Field(
102        title="Chunk overlap",
103        description="Size of overlap between chunks in tokens to store in vector store to better capture relevant context",
104        default=0,
105    )
106    text_fields: Optional[List[str]] = Field(
107        default=[],
108        title="Text fields to embed",
109        description="List of fields in the record that should be used to calculate the embedding. The field list is applied to all streams in the same way and non-existing fields are ignored. If none are defined, all fields are considered text fields. When specifying text fields, you can access nested fields in the record by using dot notation, e.g. `user.name` will access the `name` field in the `user` object. It's also possible to use wildcards to access all fields in an object, e.g. `users.*.name` will access all `names` fields in all entries of the `users` array.",
110        always_show=True,
111        examples=["text", "user.name", "users.*.name"],
112    )
113    metadata_fields: Optional[List[str]] = Field(
114        default=[],
115        title="Fields to store as metadata",
116        description="List of fields in the record that should be stored as metadata. The field list is applied to all streams in the same way and non-existing fields are ignored. If none are defined, all fields are considered metadata fields. When specifying text fields, you can access nested fields in the record by using dot notation, e.g. `user.name` will access the `name` field in the `user` object. It's also possible to use wildcards to access all fields in an object, e.g. `users.*.name` will access all `names` fields in all entries of the `users` array. When specifying nested paths, all matching values are flattened into an array set to a field named by the path.",
117        always_show=True,
118        examples=["age", "user", "user.name"],
119    )
120    text_splitter: TextSplitterConfigModel = Field(
121        default=None,
122        title="Text splitter",
123        discriminator="mode",
124        type="object",
125        description="Split text fields into chunks based on the specified method.",
126    )
127    field_name_mappings: Optional[List[FieldNameMappingConfigModel]] = Field(
128        default=[],
129        title="Field name mappings",
130        description="List of fields to rename. Not applicable for nested fields, but can be used to rename fields already flattened via dot notation.",
131    )
132
133    class Config:
134        schema_extra = {"group": "processing"}
chunk_size: int
chunk_overlap: int
text_fields: Optional[List[str]]
metadata_fields: Optional[List[str]]
class ProcessingConfigModel.Config:
133    class Config:
134        schema_extra = {"group": "processing"}
schema_extra = {'group': 'processing'}
class Writer:
 17class Writer:
 18    """
 19    The Writer class is orchestrating the document processor, the embedder and the indexer:
 20    * Incoming records are passed through the document processor to generate chunks
 21    * One the configured batch size is reached, the chunks are passed to the embedder to generate embeddings
 22    * The embedder embeds the chunks
 23    * The indexer deletes old chunks by the associated record id before indexing the new ones
 24
 25    The destination connector is responsible to create a writer instance and pass the input messages iterable to the write method.
 26    The batch size can be configured by the destination connector to give the freedom of either letting the user configure it or hardcoding it to a sensible value depending on the destination.
 27    The omit_raw_text parameter can be used to omit the raw text from the chunks. This can be useful if the raw text is very large and not needed for the destination.
 28    """
 29
 30    def __init__(
 31        self,
 32        processing_config: ProcessingConfigModel,
 33        indexer: Indexer,
 34        embedder: Embedder,
 35        batch_size: int,
 36        omit_raw_text: bool,
 37    ) -> None:
 38        self.processing_config = processing_config
 39        self.indexer = indexer
 40        self.embedder = embedder
 41        self.batch_size = batch_size
 42        self.omit_raw_text = omit_raw_text
 43        self._init_batch()
 44
 45    def _init_batch(self) -> None:
 46        self.chunks: Dict[Tuple[str, str], List[Chunk]] = defaultdict(list)
 47        self.ids_to_delete: Dict[Tuple[str, str], List[str]] = defaultdict(list)
 48        self.number_of_chunks = 0
 49
 50    def _convert_to_document(self, chunk: Chunk) -> Document:
 51        """
 52        Convert a chunk to a document for the embedder.
 53        """
 54        if chunk.page_content is None:
 55            raise ValueError("Cannot embed a chunk without page content")
 56        return Document(page_content=chunk.page_content, record=chunk.record)
 57
 58    def _process_batch(self) -> None:
 59        for (namespace, stream), ids in self.ids_to_delete.items():
 60            self.indexer.delete(ids, namespace, stream)
 61
 62        for (namespace, stream), chunks in self.chunks.items():
 63            embeddings = self.embedder.embed_documents(
 64                [self._convert_to_document(chunk) for chunk in chunks]
 65            )
 66            for i, document in enumerate(chunks):
 67                document.embedding = embeddings[i]
 68                if self.omit_raw_text:
 69                    document.page_content = None
 70            self.indexer.index(chunks, namespace, stream)
 71
 72        self._init_batch()
 73
 74    def write(
 75        self, configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
 76    ) -> Iterable[AirbyteMessage]:
 77        self.processor = DocumentProcessor(self.processing_config, configured_catalog)
 78        self.indexer.pre_sync(configured_catalog)
 79        for message in input_messages:
 80            if message.type == Type.STATE:
 81                # Emitting a state message indicates that all records which came before it have been written to the destination. So we flush
 82                # the queue to ensure writes happen, then output the state message to indicate it's safe to checkpoint state
 83                self._process_batch()
 84                yield message
 85            elif message.type == Type.RECORD:
 86                record_chunks, record_id_to_delete = self.processor.process(message.record)
 87                self.chunks[
 88                    (  # type: ignore [index] # expected "tuple[str, str]", got "tuple[str | Any | None, str | Any]"
 89                        message.record.namespace,  # type: ignore [union-attr] # record not None
 90                        message.record.stream,  # type: ignore [union-attr] # record not None
 91                    )
 92                ].extend(record_chunks)
 93                if record_id_to_delete is not None:
 94                    self.ids_to_delete[
 95                        (  # type: ignore [index] # expected "tuple[str, str]", got "tuple[str | Any | None, str | Any]"
 96                            message.record.namespace,  # type: ignore [union-attr] # record not None
 97                            message.record.stream,  # type: ignore [union-attr] # record not None
 98                        )
 99                    ].append(record_id_to_delete)
100                self.number_of_chunks += len(record_chunks)
101                if self.number_of_chunks >= self.batch_size:
102                    self._process_batch()
103
104        self._process_batch()
105        yield from self.indexer.post_sync()

The Writer class is orchestrating the document processor, the embedder and the indexer:

  • Incoming records are passed through the document processor to generate chunks
  • One the configured batch size is reached, the chunks are passed to the embedder to generate embeddings
  • The embedder embeds the chunks
  • The indexer deletes old chunks by the associated record id before indexing the new ones

The destination connector is responsible to create a writer instance and pass the input messages iterable to the write method. The batch size can be configured by the destination connector to give the freedom of either letting the user configure it or hardcoding it to a sensible value depending on the destination. The omit_raw_text parameter can be used to omit the raw text from the chunks. This can be useful if the raw text is very large and not needed for the destination.

Writer( processing_config: ProcessingConfigModel, indexer: Indexer, embedder: Embedder, batch_size: int, omit_raw_text: bool)
30    def __init__(
31        self,
32        processing_config: ProcessingConfigModel,
33        indexer: Indexer,
34        embedder: Embedder,
35        batch_size: int,
36        omit_raw_text: bool,
37    ) -> None:
38        self.processing_config = processing_config
39        self.indexer = indexer
40        self.embedder = embedder
41        self.batch_size = batch_size
42        self.omit_raw_text = omit_raw_text
43        self._init_batch()
processing_config
indexer
embedder
batch_size
omit_raw_text
def write( self, configured_catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, input_messages: Iterable[airbyte_cdk.AirbyteMessage]) -> Iterable[airbyte_cdk.AirbyteMessage]:
 74    def write(
 75        self, configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
 76    ) -> Iterable[AirbyteMessage]:
 77        self.processor = DocumentProcessor(self.processing_config, configured_catalog)
 78        self.indexer.pre_sync(configured_catalog)
 79        for message in input_messages:
 80            if message.type == Type.STATE:
 81                # Emitting a state message indicates that all records which came before it have been written to the destination. So we flush
 82                # the queue to ensure writes happen, then output the state message to indicate it's safe to checkpoint state
 83                self._process_batch()
 84                yield message
 85            elif message.type == Type.RECORD:
 86                record_chunks, record_id_to_delete = self.processor.process(message.record)
 87                self.chunks[
 88                    (  # type: ignore [index] # expected "tuple[str, str]", got "tuple[str | Any | None, str | Any]"
 89                        message.record.namespace,  # type: ignore [union-attr] # record not None
 90                        message.record.stream,  # type: ignore [union-attr] # record not None
 91                    )
 92                ].extend(record_chunks)
 93                if record_id_to_delete is not None:
 94                    self.ids_to_delete[
 95                        (  # type: ignore [index] # expected "tuple[str, str]", got "tuple[str | Any | None, str | Any]"
 96                            message.record.namespace,  # type: ignore [union-attr] # record not None
 97                            message.record.stream,  # type: ignore [union-attr] # record not None
 98                        )
 99                    ].append(record_id_to_delete)
100                self.number_of_chunks += len(record_chunks)
101                if self.number_of_chunks >= self.batch_size:
102                    self._process_batch()
103
104        self._process_batch()
105        yield from self.indexer.post_sync()