airbyte_cdk.destinations.vector_db_based.document_processor
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import json 6import logging 7from dataclasses import dataclass 8from typing import Any, Dict, List, Mapping, Optional, Tuple 9 10import dpath 11from langchain.text_splitter import Language, RecursiveCharacterTextSplitter 12from langchain.utils import stringify_dict 13from langchain_core.documents.base import Document 14 15from airbyte_cdk.destinations.vector_db_based.config import ( 16 ProcessingConfigModel, 17 SeparatorSplitterConfigModel, 18 TextSplitterConfigModel, 19) 20from airbyte_cdk.destinations.vector_db_based.utils import create_stream_identifier 21from airbyte_cdk.models import ( 22 AirbyteRecordMessage, 23 ConfiguredAirbyteCatalog, 24 ConfiguredAirbyteStream, 25 DestinationSyncMode, 26) 27from airbyte_cdk.utils.traced_exception import AirbyteTracedException, FailureType 28 29METADATA_STREAM_FIELD = "_ab_stream" 30METADATA_RECORD_ID_FIELD = "_ab_record_id" 31 32CDC_DELETED_FIELD = "_ab_cdc_deleted_at" 33 34 35@dataclass 36class Chunk: 37 page_content: Optional[str] 38 metadata: Dict[str, Any] 39 record: AirbyteRecordMessage 40 embedding: Optional[List[float]] = None 41 42 43headers_to_split_on = [ 44 "(?:^|\n)# ", 45 "(?:^|\n)## ", 46 "(?:^|\n)### ", 47 "(?:^|\n)#### ", 48 "(?:^|\n)##### ", 49 "(?:^|\n)###### ", 50] 51 52 53class DocumentProcessor: 54 """ 55 DocumentProcessor is a helper class that generates documents from Airbyte records. 56 57 It is used to generate documents from records before writing them to the destination: 58 * The text fields are extracted from the record and concatenated to a single string. 59 * The metadata fields are extracted from the record and added to the document metadata. 60 * The document is split into chunks of a given size using a langchain text splitter. 61 62 The Writer class uses the DocumentProcessor class to internally generate documents from records - in most cases you don't need to use it directly, 63 except if you want to implement a custom writer. 64 65 The config parameters specified by the ProcessingConfigModel has to be made part of the connector spec to allow the user to configure the document processor. 66 Calling DocumentProcessor.check_config(config) will validate the config and return an error message if the config is invalid. 67 """ 68 69 streams: Mapping[str, ConfiguredAirbyteStream] 70 71 @staticmethod 72 def check_config(config: ProcessingConfigModel) -> Optional[str]: 73 if config.text_splitter is not None and config.text_splitter.mode == "separator": 74 for s in config.text_splitter.separators: 75 try: 76 separator = json.loads(s) 77 if not isinstance(separator, str): 78 return f"Invalid separator: {s}. Separator needs to be a valid JSON string using double quotes." 79 except json.decoder.JSONDecodeError: 80 return f"Invalid separator: {s}. Separator needs to be a valid JSON string using double quotes." 81 return None 82 83 def _get_text_splitter( 84 self, 85 chunk_size: int, 86 chunk_overlap: int, 87 splitter_config: Optional[TextSplitterConfigModel], 88 ) -> RecursiveCharacterTextSplitter: 89 if splitter_config is None: 90 splitter_config = SeparatorSplitterConfigModel(mode="separator") 91 if splitter_config.mode == "separator": 92 return RecursiveCharacterTextSplitter.from_tiktoken_encoder( 93 chunk_size=chunk_size, 94 chunk_overlap=chunk_overlap, 95 separators=[json.loads(s) for s in splitter_config.separators], 96 keep_separator=splitter_config.keep_separator, 97 disallowed_special=(), 98 ) 99 if splitter_config.mode == "markdown": 100 return RecursiveCharacterTextSplitter.from_tiktoken_encoder( 101 chunk_size=chunk_size, 102 chunk_overlap=chunk_overlap, 103 separators=headers_to_split_on[: splitter_config.split_level], 104 is_separator_regex=True, 105 keep_separator=True, 106 disallowed_special=(), 107 ) 108 if splitter_config.mode == "code": 109 return RecursiveCharacterTextSplitter.from_tiktoken_encoder( 110 chunk_size=chunk_size, 111 chunk_overlap=chunk_overlap, 112 separators=RecursiveCharacterTextSplitter.get_separators_for_language( 113 Language(splitter_config.language) 114 ), 115 disallowed_special=(), 116 ) 117 118 def __init__(self, config: ProcessingConfigModel, catalog: ConfiguredAirbyteCatalog): 119 self.streams = { 120 create_stream_identifier(stream.stream): stream for stream in catalog.streams 121 } 122 123 self.splitter = self._get_text_splitter( 124 config.chunk_size, config.chunk_overlap, config.text_splitter 125 ) 126 self.text_fields = config.text_fields 127 self.metadata_fields = config.metadata_fields 128 self.field_name_mappings = config.field_name_mappings 129 self.logger = logging.getLogger("airbyte.document_processor") 130 131 def process(self, record: AirbyteRecordMessage) -> Tuple[List[Chunk], Optional[str]]: 132 """ 133 Generate documents from records. 134 :param records: List of AirbyteRecordMessages 135 :return: Tuple of (List of document chunks, record id to delete if a stream is in dedup mode to avoid stale documents in the vector store) 136 """ 137 if CDC_DELETED_FIELD in record.data and record.data[CDC_DELETED_FIELD]: 138 return [], self._extract_primary_key(record) 139 doc = self._generate_document(record) 140 if doc is None: 141 text_fields = ", ".join(self.text_fields) if self.text_fields else "all fields" 142 raise AirbyteTracedException( 143 internal_message="No text fields found in record", 144 message=f"Record {str(record.data)[:250]}... does not contain any of the configured text fields: {text_fields}. Please check your processing configuration, there has to be at least one text field set in each record.", 145 failure_type=FailureType.config_error, 146 ) 147 chunks = [ 148 Chunk( 149 page_content=chunk_document.page_content, 150 metadata=chunk_document.metadata, 151 record=record, 152 ) 153 for chunk_document in self._split_document(doc) 154 ] 155 id_to_delete = ( 156 doc.metadata[METADATA_RECORD_ID_FIELD] 157 if METADATA_RECORD_ID_FIELD in doc.metadata 158 else None 159 ) 160 return chunks, id_to_delete 161 162 def _generate_document(self, record: AirbyteRecordMessage) -> Optional[Document]: 163 relevant_fields = self._extract_relevant_fields(record, self.text_fields) 164 if len(relevant_fields) == 0: 165 return None 166 text = stringify_dict(relevant_fields) 167 metadata = self._extract_metadata(record) 168 return Document(page_content=text, metadata=metadata) 169 170 def _extract_relevant_fields( 171 self, record: AirbyteRecordMessage, fields: Optional[List[str]] 172 ) -> Dict[str, Any]: 173 relevant_fields = {} 174 if fields and len(fields) > 0: 175 for field in fields: 176 values = dpath.values(record.data, field, separator=".") 177 if values and len(values) > 0: 178 relevant_fields[field] = values if len(values) > 1 else values[0] 179 else: 180 relevant_fields = record.data 181 return self._remap_field_names(relevant_fields) 182 183 def _extract_metadata(self, record: AirbyteRecordMessage) -> Dict[str, Any]: 184 metadata = self._extract_relevant_fields(record, self.metadata_fields) 185 metadata[METADATA_STREAM_FIELD] = create_stream_identifier(record) 186 primary_key = self._extract_primary_key(record) 187 if primary_key: 188 metadata[METADATA_RECORD_ID_FIELD] = primary_key 189 return metadata 190 191 def _extract_primary_key(self, record: AirbyteRecordMessage) -> Optional[str]: 192 stream_identifier = create_stream_identifier(record) 193 current_stream: ConfiguredAirbyteStream = self.streams[stream_identifier] 194 # if the sync mode is deduping, use the primary key to upsert existing records instead of appending new ones 195 if ( 196 not current_stream.primary_key 197 or current_stream.destination_sync_mode != DestinationSyncMode.append_dedup 198 ): 199 return None 200 201 primary_key = [] 202 for key in current_stream.primary_key: 203 try: 204 primary_key.append(str(dpath.get(record.data, key))) 205 except KeyError: 206 primary_key.append("__not_found__") 207 stringified_primary_key = "_".join(primary_key) 208 return f"{stream_identifier}_{stringified_primary_key}" 209 210 def _split_document(self, doc: Document) -> List[Document]: 211 chunks: List[Document] = self.splitter.split_documents([doc]) 212 return chunks 213 214 def _remap_field_names(self, fields: Dict[str, Any]) -> Dict[str, Any]: 215 if not self.field_name_mappings: 216 return fields 217 218 new_fields = fields.copy() 219 for mapping in self.field_name_mappings: 220 if mapping.from_field in new_fields: 221 new_fields[mapping.to_field] = new_fields.pop(mapping.from_field) 222 223 return new_fields
36@dataclass 37class Chunk: 38 page_content: Optional[str] 39 metadata: Dict[str, Any] 40 record: AirbyteRecordMessage 41 embedding: Optional[List[float]] = None
54class DocumentProcessor: 55 """ 56 DocumentProcessor is a helper class that generates documents from Airbyte records. 57 58 It is used to generate documents from records before writing them to the destination: 59 * The text fields are extracted from the record and concatenated to a single string. 60 * The metadata fields are extracted from the record and added to the document metadata. 61 * The document is split into chunks of a given size using a langchain text splitter. 62 63 The Writer class uses the DocumentProcessor class to internally generate documents from records - in most cases you don't need to use it directly, 64 except if you want to implement a custom writer. 65 66 The config parameters specified by the ProcessingConfigModel has to be made part of the connector spec to allow the user to configure the document processor. 67 Calling DocumentProcessor.check_config(config) will validate the config and return an error message if the config is invalid. 68 """ 69 70 streams: Mapping[str, ConfiguredAirbyteStream] 71 72 @staticmethod 73 def check_config(config: ProcessingConfigModel) -> Optional[str]: 74 if config.text_splitter is not None and config.text_splitter.mode == "separator": 75 for s in config.text_splitter.separators: 76 try: 77 separator = json.loads(s) 78 if not isinstance(separator, str): 79 return f"Invalid separator: {s}. Separator needs to be a valid JSON string using double quotes." 80 except json.decoder.JSONDecodeError: 81 return f"Invalid separator: {s}. Separator needs to be a valid JSON string using double quotes." 82 return None 83 84 def _get_text_splitter( 85 self, 86 chunk_size: int, 87 chunk_overlap: int, 88 splitter_config: Optional[TextSplitterConfigModel], 89 ) -> RecursiveCharacterTextSplitter: 90 if splitter_config is None: 91 splitter_config = SeparatorSplitterConfigModel(mode="separator") 92 if splitter_config.mode == "separator": 93 return RecursiveCharacterTextSplitter.from_tiktoken_encoder( 94 chunk_size=chunk_size, 95 chunk_overlap=chunk_overlap, 96 separators=[json.loads(s) for s in splitter_config.separators], 97 keep_separator=splitter_config.keep_separator, 98 disallowed_special=(), 99 ) 100 if splitter_config.mode == "markdown": 101 return RecursiveCharacterTextSplitter.from_tiktoken_encoder( 102 chunk_size=chunk_size, 103 chunk_overlap=chunk_overlap, 104 separators=headers_to_split_on[: splitter_config.split_level], 105 is_separator_regex=True, 106 keep_separator=True, 107 disallowed_special=(), 108 ) 109 if splitter_config.mode == "code": 110 return RecursiveCharacterTextSplitter.from_tiktoken_encoder( 111 chunk_size=chunk_size, 112 chunk_overlap=chunk_overlap, 113 separators=RecursiveCharacterTextSplitter.get_separators_for_language( 114 Language(splitter_config.language) 115 ), 116 disallowed_special=(), 117 ) 118 119 def __init__(self, config: ProcessingConfigModel, catalog: ConfiguredAirbyteCatalog): 120 self.streams = { 121 create_stream_identifier(stream.stream): stream for stream in catalog.streams 122 } 123 124 self.splitter = self._get_text_splitter( 125 config.chunk_size, config.chunk_overlap, config.text_splitter 126 ) 127 self.text_fields = config.text_fields 128 self.metadata_fields = config.metadata_fields 129 self.field_name_mappings = config.field_name_mappings 130 self.logger = logging.getLogger("airbyte.document_processor") 131 132 def process(self, record: AirbyteRecordMessage) -> Tuple[List[Chunk], Optional[str]]: 133 """ 134 Generate documents from records. 135 :param records: List of AirbyteRecordMessages 136 :return: Tuple of (List of document chunks, record id to delete if a stream is in dedup mode to avoid stale documents in the vector store) 137 """ 138 if CDC_DELETED_FIELD in record.data and record.data[CDC_DELETED_FIELD]: 139 return [], self._extract_primary_key(record) 140 doc = self._generate_document(record) 141 if doc is None: 142 text_fields = ", ".join(self.text_fields) if self.text_fields else "all fields" 143 raise AirbyteTracedException( 144 internal_message="No text fields found in record", 145 message=f"Record {str(record.data)[:250]}... does not contain any of the configured text fields: {text_fields}. Please check your processing configuration, there has to be at least one text field set in each record.", 146 failure_type=FailureType.config_error, 147 ) 148 chunks = [ 149 Chunk( 150 page_content=chunk_document.page_content, 151 metadata=chunk_document.metadata, 152 record=record, 153 ) 154 for chunk_document in self._split_document(doc) 155 ] 156 id_to_delete = ( 157 doc.metadata[METADATA_RECORD_ID_FIELD] 158 if METADATA_RECORD_ID_FIELD in doc.metadata 159 else None 160 ) 161 return chunks, id_to_delete 162 163 def _generate_document(self, record: AirbyteRecordMessage) -> Optional[Document]: 164 relevant_fields = self._extract_relevant_fields(record, self.text_fields) 165 if len(relevant_fields) == 0: 166 return None 167 text = stringify_dict(relevant_fields) 168 metadata = self._extract_metadata(record) 169 return Document(page_content=text, metadata=metadata) 170 171 def _extract_relevant_fields( 172 self, record: AirbyteRecordMessage, fields: Optional[List[str]] 173 ) -> Dict[str, Any]: 174 relevant_fields = {} 175 if fields and len(fields) > 0: 176 for field in fields: 177 values = dpath.values(record.data, field, separator=".") 178 if values and len(values) > 0: 179 relevant_fields[field] = values if len(values) > 1 else values[0] 180 else: 181 relevant_fields = record.data 182 return self._remap_field_names(relevant_fields) 183 184 def _extract_metadata(self, record: AirbyteRecordMessage) -> Dict[str, Any]: 185 metadata = self._extract_relevant_fields(record, self.metadata_fields) 186 metadata[METADATA_STREAM_FIELD] = create_stream_identifier(record) 187 primary_key = self._extract_primary_key(record) 188 if primary_key: 189 metadata[METADATA_RECORD_ID_FIELD] = primary_key 190 return metadata 191 192 def _extract_primary_key(self, record: AirbyteRecordMessage) -> Optional[str]: 193 stream_identifier = create_stream_identifier(record) 194 current_stream: ConfiguredAirbyteStream = self.streams[stream_identifier] 195 # if the sync mode is deduping, use the primary key to upsert existing records instead of appending new ones 196 if ( 197 not current_stream.primary_key 198 or current_stream.destination_sync_mode != DestinationSyncMode.append_dedup 199 ): 200 return None 201 202 primary_key = [] 203 for key in current_stream.primary_key: 204 try: 205 primary_key.append(str(dpath.get(record.data, key))) 206 except KeyError: 207 primary_key.append("__not_found__") 208 stringified_primary_key = "_".join(primary_key) 209 return f"{stream_identifier}_{stringified_primary_key}" 210 211 def _split_document(self, doc: Document) -> List[Document]: 212 chunks: List[Document] = self.splitter.split_documents([doc]) 213 return chunks 214 215 def _remap_field_names(self, fields: Dict[str, Any]) -> Dict[str, Any]: 216 if not self.field_name_mappings: 217 return fields 218 219 new_fields = fields.copy() 220 for mapping in self.field_name_mappings: 221 if mapping.from_field in new_fields: 222 new_fields[mapping.to_field] = new_fields.pop(mapping.from_field) 223 224 return new_fields
DocumentProcessor is a helper class that generates documents from Airbyte records.
It is used to generate documents from records before writing them to the destination:
- The text fields are extracted from the record and concatenated to a single string.
- The metadata fields are extracted from the record and added to the document metadata.
- The document is split into chunks of a given size using a langchain text splitter.
The Writer class uses the DocumentProcessor class to internally generate documents from records - in most cases you don't need to use it directly, except if you want to implement a custom writer.
The config parameters specified by the ProcessingConfigModel has to be made part of the connector spec to allow the user to configure the document processor. Calling DocumentProcessor.check_config(config) will validate the config and return an error message if the config is invalid.
119 def __init__(self, config: ProcessingConfigModel, catalog: ConfiguredAirbyteCatalog): 120 self.streams = { 121 create_stream_identifier(stream.stream): stream for stream in catalog.streams 122 } 123 124 self.splitter = self._get_text_splitter( 125 config.chunk_size, config.chunk_overlap, config.text_splitter 126 ) 127 self.text_fields = config.text_fields 128 self.metadata_fields = config.metadata_fields 129 self.field_name_mappings = config.field_name_mappings 130 self.logger = logging.getLogger("airbyte.document_processor")
72 @staticmethod 73 def check_config(config: ProcessingConfigModel) -> Optional[str]: 74 if config.text_splitter is not None and config.text_splitter.mode == "separator": 75 for s in config.text_splitter.separators: 76 try: 77 separator = json.loads(s) 78 if not isinstance(separator, str): 79 return f"Invalid separator: {s}. Separator needs to be a valid JSON string using double quotes." 80 except json.decoder.JSONDecodeError: 81 return f"Invalid separator: {s}. Separator needs to be a valid JSON string using double quotes." 82 return None
132 def process(self, record: AirbyteRecordMessage) -> Tuple[List[Chunk], Optional[str]]: 133 """ 134 Generate documents from records. 135 :param records: List of AirbyteRecordMessages 136 :return: Tuple of (List of document chunks, record id to delete if a stream is in dedup mode to avoid stale documents in the vector store) 137 """ 138 if CDC_DELETED_FIELD in record.data and record.data[CDC_DELETED_FIELD]: 139 return [], self._extract_primary_key(record) 140 doc = self._generate_document(record) 141 if doc is None: 142 text_fields = ", ".join(self.text_fields) if self.text_fields else "all fields" 143 raise AirbyteTracedException( 144 internal_message="No text fields found in record", 145 message=f"Record {str(record.data)[:250]}... does not contain any of the configured text fields: {text_fields}. Please check your processing configuration, there has to be at least one text field set in each record.", 146 failure_type=FailureType.config_error, 147 ) 148 chunks = [ 149 Chunk( 150 page_content=chunk_document.page_content, 151 metadata=chunk_document.metadata, 152 record=record, 153 ) 154 for chunk_document in self._split_document(doc) 155 ] 156 id_to_delete = ( 157 doc.metadata[METADATA_RECORD_ID_FIELD] 158 if METADATA_RECORD_ID_FIELD in doc.metadata 159 else None 160 ) 161 return chunks, id_to_delete
Generate documents from records.
Parameters
- records: List of AirbyteRecordMessages
Returns
Tuple of (List of document chunks, record id to delete if a stream is in dedup mode to avoid stale documents in the vector store)