airbyte_cdk.destinations.vector_db_based.document_processor

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import json
  6import logging
  7from dataclasses import dataclass
  8from typing import Any, Dict, List, Mapping, Optional, Tuple
  9
 10import dpath
 11from langchain.text_splitter import Language, RecursiveCharacterTextSplitter
 12from langchain.utils import stringify_dict
 13from langchain_core.documents.base import Document
 14
 15from airbyte_cdk.destinations.vector_db_based.config import (
 16    ProcessingConfigModel,
 17    SeparatorSplitterConfigModel,
 18    TextSplitterConfigModel,
 19)
 20from airbyte_cdk.destinations.vector_db_based.utils import create_stream_identifier
 21from airbyte_cdk.models import (
 22    AirbyteRecordMessage,
 23    ConfiguredAirbyteCatalog,
 24    ConfiguredAirbyteStream,
 25    DestinationSyncMode,
 26)
 27from airbyte_cdk.utils.traced_exception import AirbyteTracedException, FailureType
 28
 29METADATA_STREAM_FIELD = "_ab_stream"
 30METADATA_RECORD_ID_FIELD = "_ab_record_id"
 31
 32CDC_DELETED_FIELD = "_ab_cdc_deleted_at"
 33
 34
 35@dataclass
 36class Chunk:
 37    page_content: Optional[str]
 38    metadata: Dict[str, Any]
 39    record: AirbyteRecordMessage
 40    embedding: Optional[List[float]] = None
 41
 42
 43headers_to_split_on = [
 44    "(?:^|\n)# ",
 45    "(?:^|\n)## ",
 46    "(?:^|\n)### ",
 47    "(?:^|\n)#### ",
 48    "(?:^|\n)##### ",
 49    "(?:^|\n)###### ",
 50]
 51
 52
 53class DocumentProcessor:
 54    """
 55    DocumentProcessor is a helper class that generates documents from Airbyte records.
 56
 57    It is used to generate documents from records before writing them to the destination:
 58    * The text fields are extracted from the record and concatenated to a single string.
 59    * The metadata fields are extracted from the record and added to the document metadata.
 60    * The document is split into chunks of a given size using a langchain text splitter.
 61
 62    The Writer class uses the DocumentProcessor class to internally generate documents from records - in most cases you don't need to use it directly,
 63    except if you want to implement a custom writer.
 64
 65    The config parameters specified by the ProcessingConfigModel has to be made part of the connector spec to allow the user to configure the document processor.
 66    Calling DocumentProcessor.check_config(config) will validate the config and return an error message if the config is invalid.
 67    """
 68
 69    streams: Mapping[str, ConfiguredAirbyteStream]
 70
 71    @staticmethod
 72    def check_config(config: ProcessingConfigModel) -> Optional[str]:
 73        if config.text_splitter is not None and config.text_splitter.mode == "separator":
 74            for s in config.text_splitter.separators:
 75                try:
 76                    separator = json.loads(s)
 77                    if not isinstance(separator, str):
 78                        return f"Invalid separator: {s}. Separator needs to be a valid JSON string using double quotes."
 79                except json.decoder.JSONDecodeError:
 80                    return f"Invalid separator: {s}. Separator needs to be a valid JSON string using double quotes."
 81        return None
 82
 83    def _get_text_splitter(
 84        self,
 85        chunk_size: int,
 86        chunk_overlap: int,
 87        splitter_config: Optional[TextSplitterConfigModel],
 88    ) -> RecursiveCharacterTextSplitter:
 89        if splitter_config is None:
 90            splitter_config = SeparatorSplitterConfigModel(mode="separator")
 91        if splitter_config.mode == "separator":
 92            return RecursiveCharacterTextSplitter.from_tiktoken_encoder(
 93                chunk_size=chunk_size,
 94                chunk_overlap=chunk_overlap,
 95                separators=[json.loads(s) for s in splitter_config.separators],
 96                keep_separator=splitter_config.keep_separator,
 97                disallowed_special=(),
 98            )
 99        if splitter_config.mode == "markdown":
100            return RecursiveCharacterTextSplitter.from_tiktoken_encoder(
101                chunk_size=chunk_size,
102                chunk_overlap=chunk_overlap,
103                separators=headers_to_split_on[: splitter_config.split_level],
104                is_separator_regex=True,
105                keep_separator=True,
106                disallowed_special=(),
107            )
108        if splitter_config.mode == "code":
109            return RecursiveCharacterTextSplitter.from_tiktoken_encoder(
110                chunk_size=chunk_size,
111                chunk_overlap=chunk_overlap,
112                separators=RecursiveCharacterTextSplitter.get_separators_for_language(
113                    Language(splitter_config.language)
114                ),
115                disallowed_special=(),
116            )
117
118    def __init__(self, config: ProcessingConfigModel, catalog: ConfiguredAirbyteCatalog):
119        self.streams = {
120            create_stream_identifier(stream.stream): stream for stream in catalog.streams
121        }
122
123        self.splitter = self._get_text_splitter(
124            config.chunk_size, config.chunk_overlap, config.text_splitter
125        )
126        self.text_fields = config.text_fields
127        self.metadata_fields = config.metadata_fields
128        self.field_name_mappings = config.field_name_mappings
129        self.logger = logging.getLogger("airbyte.document_processor")
130
131    def process(self, record: AirbyteRecordMessage) -> Tuple[List[Chunk], Optional[str]]:
132        """
133        Generate documents from records.
134        :param records: List of AirbyteRecordMessages
135        :return: Tuple of (List of document chunks, record id to delete if a stream is in dedup mode to avoid stale documents in the vector store)
136        """
137        if CDC_DELETED_FIELD in record.data and record.data[CDC_DELETED_FIELD]:
138            return [], self._extract_primary_key(record)
139        doc = self._generate_document(record)
140        if doc is None:
141            text_fields = ", ".join(self.text_fields) if self.text_fields else "all fields"
142            raise AirbyteTracedException(
143                internal_message="No text fields found in record",
144                message=f"Record {str(record.data)[:250]}... does not contain any of the configured text fields: {text_fields}. Please check your processing configuration, there has to be at least one text field set in each record.",
145                failure_type=FailureType.config_error,
146            )
147        chunks = [
148            Chunk(
149                page_content=chunk_document.page_content,
150                metadata=chunk_document.metadata,
151                record=record,
152            )
153            for chunk_document in self._split_document(doc)
154        ]
155        id_to_delete = (
156            doc.metadata[METADATA_RECORD_ID_FIELD]
157            if METADATA_RECORD_ID_FIELD in doc.metadata
158            else None
159        )
160        return chunks, id_to_delete
161
162    def _generate_document(self, record: AirbyteRecordMessage) -> Optional[Document]:
163        relevant_fields = self._extract_relevant_fields(record, self.text_fields)
164        if len(relevant_fields) == 0:
165            return None
166        text = stringify_dict(relevant_fields)
167        metadata = self._extract_metadata(record)
168        return Document(page_content=text, metadata=metadata)
169
170    def _extract_relevant_fields(
171        self, record: AirbyteRecordMessage, fields: Optional[List[str]]
172    ) -> Dict[str, Any]:
173        relevant_fields = {}
174        if fields and len(fields) > 0:
175            for field in fields:
176                values = dpath.values(record.data, field, separator=".")
177                if values and len(values) > 0:
178                    relevant_fields[field] = values if len(values) > 1 else values[0]
179        else:
180            relevant_fields = record.data
181        return self._remap_field_names(relevant_fields)
182
183    def _extract_metadata(self, record: AirbyteRecordMessage) -> Dict[str, Any]:
184        metadata = self._extract_relevant_fields(record, self.metadata_fields)
185        metadata[METADATA_STREAM_FIELD] = create_stream_identifier(record)
186        primary_key = self._extract_primary_key(record)
187        if primary_key:
188            metadata[METADATA_RECORD_ID_FIELD] = primary_key
189        return metadata
190
191    def _extract_primary_key(self, record: AirbyteRecordMessage) -> Optional[str]:
192        stream_identifier = create_stream_identifier(record)
193        current_stream: ConfiguredAirbyteStream = self.streams[stream_identifier]
194        # if the sync mode is deduping, use the primary key to upsert existing records instead of appending new ones
195        if (
196            not current_stream.primary_key
197            or current_stream.destination_sync_mode != DestinationSyncMode.append_dedup
198        ):
199            return None
200
201        primary_key = []
202        for key in current_stream.primary_key:
203            try:
204                primary_key.append(str(dpath.get(record.data, key)))
205            except KeyError:
206                primary_key.append("__not_found__")
207        stringified_primary_key = "_".join(primary_key)
208        return f"{stream_identifier}_{stringified_primary_key}"
209
210    def _split_document(self, doc: Document) -> List[Document]:
211        chunks: List[Document] = self.splitter.split_documents([doc])
212        return chunks
213
214    def _remap_field_names(self, fields: Dict[str, Any]) -> Dict[str, Any]:
215        if not self.field_name_mappings:
216            return fields
217
218        new_fields = fields.copy()
219        for mapping in self.field_name_mappings:
220            if mapping.from_field in new_fields:
221                new_fields[mapping.to_field] = new_fields.pop(mapping.from_field)
222
223        return new_fields
METADATA_STREAM_FIELD = '_ab_stream'
METADATA_RECORD_ID_FIELD = '_ab_record_id'
CDC_DELETED_FIELD = '_ab_cdc_deleted_at'
@dataclass
class Chunk:
36@dataclass
37class Chunk:
38    page_content: Optional[str]
39    metadata: Dict[str, Any]
40    record: AirbyteRecordMessage
41    embedding: Optional[List[float]] = None
Chunk( page_content: Optional[str], metadata: Dict[str, Any], record: airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteRecordMessage, embedding: Optional[List[float]] = None)
page_content: Optional[str]
metadata: Dict[str, Any]
record: airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteRecordMessage
embedding: Optional[List[float]] = None
headers_to_split_on = ['(?:^|\n)# ', '(?:^|\n)## ', '(?:^|\n)### ', '(?:^|\n)#### ', '(?:^|\n)##### ', '(?:^|\n)###### ']
class DocumentProcessor:
 54class DocumentProcessor:
 55    """
 56    DocumentProcessor is a helper class that generates documents from Airbyte records.
 57
 58    It is used to generate documents from records before writing them to the destination:
 59    * The text fields are extracted from the record and concatenated to a single string.
 60    * The metadata fields are extracted from the record and added to the document metadata.
 61    * The document is split into chunks of a given size using a langchain text splitter.
 62
 63    The Writer class uses the DocumentProcessor class to internally generate documents from records - in most cases you don't need to use it directly,
 64    except if you want to implement a custom writer.
 65
 66    The config parameters specified by the ProcessingConfigModel has to be made part of the connector spec to allow the user to configure the document processor.
 67    Calling DocumentProcessor.check_config(config) will validate the config and return an error message if the config is invalid.
 68    """
 69
 70    streams: Mapping[str, ConfiguredAirbyteStream]
 71
 72    @staticmethod
 73    def check_config(config: ProcessingConfigModel) -> Optional[str]:
 74        if config.text_splitter is not None and config.text_splitter.mode == "separator":
 75            for s in config.text_splitter.separators:
 76                try:
 77                    separator = json.loads(s)
 78                    if not isinstance(separator, str):
 79                        return f"Invalid separator: {s}. Separator needs to be a valid JSON string using double quotes."
 80                except json.decoder.JSONDecodeError:
 81                    return f"Invalid separator: {s}. Separator needs to be a valid JSON string using double quotes."
 82        return None
 83
 84    def _get_text_splitter(
 85        self,
 86        chunk_size: int,
 87        chunk_overlap: int,
 88        splitter_config: Optional[TextSplitterConfigModel],
 89    ) -> RecursiveCharacterTextSplitter:
 90        if splitter_config is None:
 91            splitter_config = SeparatorSplitterConfigModel(mode="separator")
 92        if splitter_config.mode == "separator":
 93            return RecursiveCharacterTextSplitter.from_tiktoken_encoder(
 94                chunk_size=chunk_size,
 95                chunk_overlap=chunk_overlap,
 96                separators=[json.loads(s) for s in splitter_config.separators],
 97                keep_separator=splitter_config.keep_separator,
 98                disallowed_special=(),
 99            )
100        if splitter_config.mode == "markdown":
101            return RecursiveCharacterTextSplitter.from_tiktoken_encoder(
102                chunk_size=chunk_size,
103                chunk_overlap=chunk_overlap,
104                separators=headers_to_split_on[: splitter_config.split_level],
105                is_separator_regex=True,
106                keep_separator=True,
107                disallowed_special=(),
108            )
109        if splitter_config.mode == "code":
110            return RecursiveCharacterTextSplitter.from_tiktoken_encoder(
111                chunk_size=chunk_size,
112                chunk_overlap=chunk_overlap,
113                separators=RecursiveCharacterTextSplitter.get_separators_for_language(
114                    Language(splitter_config.language)
115                ),
116                disallowed_special=(),
117            )
118
119    def __init__(self, config: ProcessingConfigModel, catalog: ConfiguredAirbyteCatalog):
120        self.streams = {
121            create_stream_identifier(stream.stream): stream for stream in catalog.streams
122        }
123
124        self.splitter = self._get_text_splitter(
125            config.chunk_size, config.chunk_overlap, config.text_splitter
126        )
127        self.text_fields = config.text_fields
128        self.metadata_fields = config.metadata_fields
129        self.field_name_mappings = config.field_name_mappings
130        self.logger = logging.getLogger("airbyte.document_processor")
131
132    def process(self, record: AirbyteRecordMessage) -> Tuple[List[Chunk], Optional[str]]:
133        """
134        Generate documents from records.
135        :param records: List of AirbyteRecordMessages
136        :return: Tuple of (List of document chunks, record id to delete if a stream is in dedup mode to avoid stale documents in the vector store)
137        """
138        if CDC_DELETED_FIELD in record.data and record.data[CDC_DELETED_FIELD]:
139            return [], self._extract_primary_key(record)
140        doc = self._generate_document(record)
141        if doc is None:
142            text_fields = ", ".join(self.text_fields) if self.text_fields else "all fields"
143            raise AirbyteTracedException(
144                internal_message="No text fields found in record",
145                message=f"Record {str(record.data)[:250]}... does not contain any of the configured text fields: {text_fields}. Please check your processing configuration, there has to be at least one text field set in each record.",
146                failure_type=FailureType.config_error,
147            )
148        chunks = [
149            Chunk(
150                page_content=chunk_document.page_content,
151                metadata=chunk_document.metadata,
152                record=record,
153            )
154            for chunk_document in self._split_document(doc)
155        ]
156        id_to_delete = (
157            doc.metadata[METADATA_RECORD_ID_FIELD]
158            if METADATA_RECORD_ID_FIELD in doc.metadata
159            else None
160        )
161        return chunks, id_to_delete
162
163    def _generate_document(self, record: AirbyteRecordMessage) -> Optional[Document]:
164        relevant_fields = self._extract_relevant_fields(record, self.text_fields)
165        if len(relevant_fields) == 0:
166            return None
167        text = stringify_dict(relevant_fields)
168        metadata = self._extract_metadata(record)
169        return Document(page_content=text, metadata=metadata)
170
171    def _extract_relevant_fields(
172        self, record: AirbyteRecordMessage, fields: Optional[List[str]]
173    ) -> Dict[str, Any]:
174        relevant_fields = {}
175        if fields and len(fields) > 0:
176            for field in fields:
177                values = dpath.values(record.data, field, separator=".")
178                if values and len(values) > 0:
179                    relevant_fields[field] = values if len(values) > 1 else values[0]
180        else:
181            relevant_fields = record.data
182        return self._remap_field_names(relevant_fields)
183
184    def _extract_metadata(self, record: AirbyteRecordMessage) -> Dict[str, Any]:
185        metadata = self._extract_relevant_fields(record, self.metadata_fields)
186        metadata[METADATA_STREAM_FIELD] = create_stream_identifier(record)
187        primary_key = self._extract_primary_key(record)
188        if primary_key:
189            metadata[METADATA_RECORD_ID_FIELD] = primary_key
190        return metadata
191
192    def _extract_primary_key(self, record: AirbyteRecordMessage) -> Optional[str]:
193        stream_identifier = create_stream_identifier(record)
194        current_stream: ConfiguredAirbyteStream = self.streams[stream_identifier]
195        # if the sync mode is deduping, use the primary key to upsert existing records instead of appending new ones
196        if (
197            not current_stream.primary_key
198            or current_stream.destination_sync_mode != DestinationSyncMode.append_dedup
199        ):
200            return None
201
202        primary_key = []
203        for key in current_stream.primary_key:
204            try:
205                primary_key.append(str(dpath.get(record.data, key)))
206            except KeyError:
207                primary_key.append("__not_found__")
208        stringified_primary_key = "_".join(primary_key)
209        return f"{stream_identifier}_{stringified_primary_key}"
210
211    def _split_document(self, doc: Document) -> List[Document]:
212        chunks: List[Document] = self.splitter.split_documents([doc])
213        return chunks
214
215    def _remap_field_names(self, fields: Dict[str, Any]) -> Dict[str, Any]:
216        if not self.field_name_mappings:
217            return fields
218
219        new_fields = fields.copy()
220        for mapping in self.field_name_mappings:
221            if mapping.from_field in new_fields:
222                new_fields[mapping.to_field] = new_fields.pop(mapping.from_field)
223
224        return new_fields

DocumentProcessor is a helper class that generates documents from Airbyte records.

It is used to generate documents from records before writing them to the destination:

  • The text fields are extracted from the record and concatenated to a single string.
  • The metadata fields are extracted from the record and added to the document metadata.
  • The document is split into chunks of a given size using a langchain text splitter.

The Writer class uses the DocumentProcessor class to internally generate documents from records - in most cases you don't need to use it directly, except if you want to implement a custom writer.

The config parameters specified by the ProcessingConfigModel has to be made part of the connector spec to allow the user to configure the document processor. Calling DocumentProcessor.check_config(config) will validate the config and return an error message if the config is invalid.

DocumentProcessor( config: airbyte_cdk.destinations.vector_db_based.ProcessingConfigModel, catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog)
119    def __init__(self, config: ProcessingConfigModel, catalog: ConfiguredAirbyteCatalog):
120        self.streams = {
121            create_stream_identifier(stream.stream): stream for stream in catalog.streams
122        }
123
124        self.splitter = self._get_text_splitter(
125            config.chunk_size, config.chunk_overlap, config.text_splitter
126        )
127        self.text_fields = config.text_fields
128        self.metadata_fields = config.metadata_fields
129        self.field_name_mappings = config.field_name_mappings
130        self.logger = logging.getLogger("airbyte.document_processor")
streams: Mapping[str, airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteStream]
@staticmethod
def check_config( config: airbyte_cdk.destinations.vector_db_based.ProcessingConfigModel) -> Optional[str]:
72    @staticmethod
73    def check_config(config: ProcessingConfigModel) -> Optional[str]:
74        if config.text_splitter is not None and config.text_splitter.mode == "separator":
75            for s in config.text_splitter.separators:
76                try:
77                    separator = json.loads(s)
78                    if not isinstance(separator, str):
79                        return f"Invalid separator: {s}. Separator needs to be a valid JSON string using double quotes."
80                except json.decoder.JSONDecodeError:
81                    return f"Invalid separator: {s}. Separator needs to be a valid JSON string using double quotes."
82        return None
splitter
text_fields
metadata_fields
field_name_mappings
logger
def process( self, record: airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteRecordMessage) -> Tuple[List[Chunk], Optional[str]]:
132    def process(self, record: AirbyteRecordMessage) -> Tuple[List[Chunk], Optional[str]]:
133        """
134        Generate documents from records.
135        :param records: List of AirbyteRecordMessages
136        :return: Tuple of (List of document chunks, record id to delete if a stream is in dedup mode to avoid stale documents in the vector store)
137        """
138        if CDC_DELETED_FIELD in record.data and record.data[CDC_DELETED_FIELD]:
139            return [], self._extract_primary_key(record)
140        doc = self._generate_document(record)
141        if doc is None:
142            text_fields = ", ".join(self.text_fields) if self.text_fields else "all fields"
143            raise AirbyteTracedException(
144                internal_message="No text fields found in record",
145                message=f"Record {str(record.data)[:250]}... does not contain any of the configured text fields: {text_fields}. Please check your processing configuration, there has to be at least one text field set in each record.",
146                failure_type=FailureType.config_error,
147            )
148        chunks = [
149            Chunk(
150                page_content=chunk_document.page_content,
151                metadata=chunk_document.metadata,
152                record=record,
153            )
154            for chunk_document in self._split_document(doc)
155        ]
156        id_to_delete = (
157            doc.metadata[METADATA_RECORD_ID_FIELD]
158            if METADATA_RECORD_ID_FIELD in doc.metadata
159            else None
160        )
161        return chunks, id_to_delete

Generate documents from records.

Parameters
  • records: List of AirbyteRecordMessages
Returns

Tuple of (List of document chunks, record id to delete if a stream is in dedup mode to avoid stale documents in the vector store)