Source code for langchain_memvid.encoder

"""
Encodes text chunks and metadata as QR codes in video frames for MemVid.

- Adds new documents and builds video storage with QR codes.
- Maintains mapping between document IDs and video frames for efficient deletion.
"""

import orjson
import time
from pathlib import Path
from typing import List, Dict, Any, Optional

from .exceptions import EncodingError
from .video import VideoProcessor
from .index import IndexManager
from .config import VectorStoreConfig, LANGCHAIN_MEMVID_DEFAULT_VIDEO_FILE, LANGCHAIN_MEMVID_DEFAULT_INDEX_DIR
from .logging import get_logger
from .types import BuildStats

logger = get_logger("encoder")


[docs] class Encoder: """Encodes text chunks and metadata as QR codes in video frames for MemVid. - Adds new documents and builds video storage with QR codes. - Maintains mapping between document IDs and video frames for efficient deletion. """
[docs] def __init__( self, config: VectorStoreConfig, index_manager: IndexManager, ): """Initialize the encoder. Args: config: Configuration for the encoder index_manager: Index manager for storing embeddings Example: >>> config = VectorStoreConfig(...) >>> index_manager = IndexManager(...) >>> encoder = Encoder(config, index_manager) """ self.config = config self.index_manager = index_manager self.video_processor = VideoProcessor( video_config=config.video, qrcode_config=config.qrcode ) self._chunks: List[Dict[str, Any]] = []
[docs] def add_chunks( self, texts: List[str], metadatas: Optional[List[Dict[str, Any]]] = None, ): """Add text chunks for encoding. Args: texts: List of text chunks to encode metadatas: Optional list of metadata dictionaries for each chunk Raises: EncodingError: If adding chunks fails Example: >>> encoder.add_chunks(["text1", "text2"], [{"source": "doc1"}, {"source": "doc2"}]) """ try: if metadatas is None: metadatas = [{} for _ in texts] if len(texts) != len(metadatas): raise EncodingError("Number of texts must match number of metadata entries") # Add chunks with metadata for text, metadata in zip(texts, metadatas): self._chunks.append({ "text": text, "metadata": metadata }) logger.info(f"Added {len(texts)} chunks for encoding") except Exception as e: raise EncodingError(f"Failed to add chunks: {str(e)}")
[docs] def build_video( self, output_file: Path = LANGCHAIN_MEMVID_DEFAULT_VIDEO_FILE, index_dir: Path = LANGCHAIN_MEMVID_DEFAULT_INDEX_DIR, ) -> BuildStats: """Build video from added chunks. This method implements the hybrid storage approach and optimization strategies for efficient video building and frame mapping. Hybrid Storage Implementation - Essential Metadata: Stores only essential metadata in FAISS for efficiency - Document text, source, category, doc_id, metadata_hash - Significant reduction in FAISS index size - Fast search operations with minimal memory usage - Full Metadata: Stores complete metadata in video QR codes - All metadata fields and custom attributes - Complete backup and archive functionality - On-demand retrieval when needed - Metadata is stored in the video QR codes Optimization Strategies Frame Index Mapping - Bidirectional Mapping: Establishes doc_id frame_number mapping - O(1) Lookup: Enables constant-time frame number retrieval - Deletion Optimization: Allows precise frame-level deletion without full video rebuilds - Consistency: Maintains synchronization between FAISS index and video frames Performance Characteristics - Encoding Time: Optimized for large document collections - Memory Usage: Efficient processing of chunks and frames - Storage Efficiency: Hybrid approach reduces overall storage requirements - Quality: Maintains video quality while optimizing storage Process Flow 1. Text Processing: Extract texts from chunks for FAISS indexing 2. FAISS Indexing: Add essential metadata to FAISS index 3. QR Code Generation: Create QR codes with full metadata 4. Frame Mapping: Establish bidirectional document-to-frame mapping 5. Video Encoding: Encode QR codes into video frames 6. Index Saving: Save FAISS index with frame mappings Args: output_file: Path to save the video file, defaults to LANGCHAIN_MEMVID_DEFAULT_VIDEO_FILE index_dir: Path to save the index directory, defaults to LANGCHAIN_MEMVID_DEFAULT_INDEX_DIR Returns: BuildStats: Statistics for the video build process including: - total_chunks: Number of chunks encoded - video_size_mb: Size of the video file in MB - encoding_time: Time taken for encoding in seconds - index_path: Path to the saved index - video_path: Path to the saved video Raises: EncodingError: If video building fails ValueError: If output paths are invalid Example: # Build video with hybrid storage approach stats = encoder.build_video(Path("output.mp4"), Path("index.d")) print(f"Encoded {stats.total_chunks} chunks in {stats.encoding_time:.2f}s") print(f"Video size: {stats.video_size_mb:.2f} MB") # Check frame mapping statistics frame_stats = encoder.index_manager.get_frame_mapping_stats() print(f"Frame mapping coverage: {frame_stats['mapping_coverage']:.1f}%") """ try: # Validate paths if not output_file.parent.exists(): raise ValueError(f"Output directory does not exist: {output_file.parent}") if not index_dir.parent.exists(): raise ValueError(f"Index directory does not exist: {index_dir.parent}") if not self._chunks: raise EncodingError("No chunks to encode") start_time = time.time() # Get texts from chunks texts = [chunk["text"] for chunk in self._chunks] # Add vectors to index self.index_manager.add_texts( texts=texts, metadata=self._chunks ) # Create QR codes for each chunk qr_frames = [] for i, chunk in enumerate(self._chunks): # Convert chunk to JSON string chunk_data = orjson.dumps(chunk, option=orjson.OPT_NON_STR_KEYS).decode() # Create QR code qr_frame = self.video_processor.create_qr_code(chunk_data) qr_frames.append(qr_frame) # Set frame mapping for efficient deletion self.index_manager.set_frame_mapping(i, len(qr_frames) - 1) # Encode video self.video_processor.encode_video( frames=qr_frames, output_path=output_file ) # Save index index_dir = index_dir.with_suffix('.d') self.index_manager.save(index_dir) # Calculate statistics encoding_time = time.time() - start_time stats: BuildStats = BuildStats( total_chunks=len(self._chunks), video_size_mb=output_file.stat().st_size / (1024 * 1024), encoding_time=encoding_time, index_path=index_dir, video_path=output_file ) # Clear chunks after successful build self._chunks = [] logger.info(f"Built video with {stats.total_chunks} chunks in {stats.encoding_time:.2f}s") return stats except Exception as e: raise EncodingError(f"Failed to build video: {str(e)}")
[docs] def clear(self): """Clear all added chunks.""" self._chunks = [] logger.info("Cleared all chunks")