-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathingestion.py
More file actions
125 lines (106 loc) · 4.65 KB
/
ingestion.py
File metadata and controls
125 lines (106 loc) · 4.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_chroma import Chroma
from uuid import uuid4
from common_utils import load_yaml_config
from langchain_ollama import OllamaEmbeddings
import os
class DocumentIngestion:
def __init__(self, config_path="config.yaml"):
"""Initialize the DocumentIngestion class with configuration."""
self.config = load_yaml_config(config_path)
self.embeddings = OllamaEmbeddings(model=self.config["ollama"]["embedding_model"])
self.vector_store = None
def load_documents(self, doc_paths):
"""Load documents from given paths."""
all_docs = []
for doc_path in doc_paths:
try:
loader = PyMuPDFLoader(doc_path)
docs = loader.load()
all_docs.extend(docs)
print(f"Successfully loaded document: {doc_path}")
except Exception as e:
print(f"Error loading document {doc_path}: {str(e)}")
return all_docs
def chunk_documents(self, docs):
"""Split documents into chunks based on configuration."""
try:
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=self.config["ingestion"]["chunk_size"],
chunk_overlap=self.config["ingestion"]["chunk_overlap"],
separators=self.config["ingestion"]["separators"],
add_start_index=True
)
chunks = text_splitter.split_documents(docs)
print(f"Successfully created {len(chunks)} chunks from {len(docs)} documents")
return chunks
except Exception as e:
print(f"Error chunking documents: {str(e)}")
return []
def init_vector_store(self):
"""Initialize the vector store."""
try:
self.vector_store = Chroma(
collection_name=self.config["vector_store"]["collection_name"],
embedding_function=self.embeddings,
persist_directory=self.config["vector_store"]["persist_directory"]
)
print(f"Successfully initialized vector store with collection: {self.config['vector_store']['collection_name']}")
return self.vector_store
except Exception as e:
print(f"Error initializing vector store: {str(e)}")
return None
def add_documents_to_store(self, chunks):
"""Add document chunks to the vector store."""
try:
if self.vector_store is None:
self.init_vector_store()
if self.vector_store is None:
print("Error: Could not initialize vector store")
return False
print(f"Ingesting {len(chunks)} documents to vector store")
uuids = [str(uuid4()) for _ in range(len(chunks))]
self.vector_store.add_documents(documents=chunks, ids=uuids)
print(f"Successfully added {len(chunks)} documents to vector store")
return True
except Exception as e:
print(f"Error adding documents to vector store: {str(e)}")
return False
def process_documents(self, doc_paths):
"""Complete pipeline to process documents from paths to vector store."""
try:
# Load documents
docs = self.load_documents(doc_paths)
if not docs:
print("No documents loaded successfully")
return False
# Chunk documents
chunks = self.chunk_documents(docs)
if not chunks:
print("No chunks created from documents")
return False
# Add to vector store
return self.add_documents_to_store(chunks)
except Exception as e:
print(f"Error in document processing pipeline: {str(e)}")
return False
def get_vector_store(self):
"""Get the vector store instance."""
if self.vector_store is None:
self.init_vector_store()
return self.vector_store
# Backward compatibility functions
config = load_yaml_config()
def load_documents(doc_paths):
ingestion = DocumentIngestion()
return ingestion.load_documents(doc_paths)
def chunk_documents(docs):
ingestion = DocumentIngestion()
return ingestion.chunk_documents(docs)
def init_vector_store(embeddings=None):
ingestion = DocumentIngestion()
return ingestion.init_vector_store()
def add_docs_to_store(chunks):
ingestion = DocumentIngestion()
return ingestion.add_documents_to_store(chunks)