kotaemon/knowledgehub/indices/vectorindex.py
Duc Nguyen (john) e34b1e4c6d Refactor the index component and update the MVP insurance accordingly (#90)
Refactor the `kotaemon/pipelines` module to `kotaemon/indices`. Create the VectorIndex.

Note: currently I place `qa` to be inside `kotaemon/indices` since at the moment we only have `qa` in RAG. At the same time, I think `qa` can be an independent module in `kotaemon/qa`. Since this can be changed later, I still go at the 1st option for now to observe if we can change it later.
2023-11-30 18:35:07 +07:00

186 lines
5.8 KiB
Python

from __future__ import annotations
import uuid
from pathlib import Path
from typing import Optional, Sequence, cast
from kotaemon.base import BaseComponent, Document, RetrievedDocument
from kotaemon.embeddings import BaseEmbeddings
from kotaemon.storages import BaseDocumentStore, BaseVectorStore
from .base import BaseIndexing, BaseRetrieval
from .rankings import BaseReranking
VECTOR_STORE_FNAME = "vectorstore"
DOC_STORE_FNAME = "docstore"
class VectorIndexing(BaseIndexing):
"""Ingest the document, run through the embedding, and store the embedding in a
vector store.
This pipeline supports the following set of inputs:
- List of documents
- List of texts
"""
vector_store: BaseVectorStore
doc_store: Optional[BaseDocumentStore] = None
embedding: BaseEmbeddings
def to_retrieval_pipeline(self, *args, **kwargs):
"""Convert the indexing pipeline to a retrieval pipeline"""
return VectorRetrieval(
vector_store=self.vector_store,
doc_store=self.doc_store,
embedding=self.embedding,
**kwargs,
)
def to_qa_pipeline(self, *args, **kwargs):
from .qa import CitationQAPipeline
return TextVectorQA(
retrieving_pipeline=self.to_retrieval_pipeline(**kwargs),
qa_pipeline=CitationQAPipeline(**kwargs),
)
def run(self, text: str | list[str] | Document | list[Document]) -> None:
input_: list[Document] = []
if not isinstance(text, list):
text = [text]
for item in cast(list, text):
if isinstance(item, str):
input_.append(Document(text=item, id_=str(uuid.uuid4())))
elif isinstance(item, Document):
input_.append(item)
else:
raise ValueError(
f"Invalid input type {type(item)}, should be str or Document"
)
embeddings = self.embedding(input_)
self.vector_store.add(
embeddings=embeddings,
ids=[t.id_ for t in input_],
)
if self.doc_store:
self.doc_store.add(input_)
def save(
self,
path: str | Path,
vectorstore_fname: str = VECTOR_STORE_FNAME,
docstore_fname: str = DOC_STORE_FNAME,
):
"""Save the whole state of the indexing pipeline vector store and all
necessary information to disk
Args:
path (str): path to save the state
"""
if isinstance(path, str):
path = Path(path)
self.vector_store.save(path / vectorstore_fname)
if self.doc_store:
self.doc_store.save(path / docstore_fname)
def load(
self,
path: str | Path,
vectorstore_fname: str = VECTOR_STORE_FNAME,
docstore_fname: str = DOC_STORE_FNAME,
):
"""Load all information from disk to an object"""
if isinstance(path, str):
path = Path(path)
self.vector_store.load(path / vectorstore_fname)
if self.doc_store:
self.doc_store.load(path / docstore_fname)
class VectorRetrieval(BaseRetrieval):
"""Retrieve list of documents from vector store"""
vector_store: BaseVectorStore
doc_store: Optional[BaseDocumentStore] = None
embedding: BaseEmbeddings
rerankers: Sequence[BaseReranking] = []
top_k: int = 1
def run(
self, text: str | Document, top_k: Optional[int] = None, **kwargs
) -> list[RetrievedDocument]:
"""Retrieve a list of documents from vector store
Args:
text: the text to retrieve similar documents
top_k: number of top similar documents to return
Returns:
list[RetrievedDocument]: list of retrieved documents
"""
if top_k is None:
top_k = self.top_k
if self.doc_store is None:
raise ValueError(
"doc_store is not provided. Please provide a doc_store to "
"retrieve the documents"
)
emb: list[float] = self.embedding(text)[0].embedding
_, scores, ids = self.vector_store.query(embedding=emb, top_k=top_k)
docs = self.doc_store.get(ids)
result = [
RetrievedDocument(**doc.to_dict(), score=score)
for doc, score in zip(docs, scores)
]
# use additional reranker to re-order the document list
if self.rerankers:
for reranker in self.rerankers:
result = reranker(documents=result, query=text)
return result
def save(
self,
path: str | Path,
vectorstore_fname: str = VECTOR_STORE_FNAME,
docstore_fname: str = DOC_STORE_FNAME,
):
"""Save the whole state of the indexing pipeline vector store and all
necessary information to disk
Args:
path (str): path to save the state
"""
if isinstance(path, str):
path = Path(path)
self.vector_store.save(path / vectorstore_fname)
if self.doc_store:
self.doc_store.save(path / docstore_fname)
def load(
self,
path: str | Path,
vectorstore_fname: str = VECTOR_STORE_FNAME,
docstore_fname: str = DOC_STORE_FNAME,
):
"""Load all information from disk to an object"""
if isinstance(path, str):
path = Path(path)
self.vector_store.load(path / vectorstore_fname)
if self.doc_store:
self.doc_store.load(path / docstore_fname)
class TextVectorQA(BaseComponent):
retrieving_pipeline: BaseRetrieval
qa_pipeline: BaseComponent
def run(self, question, **kwargs):
retrieved_documents = self.retrieving_pipeline(question, **kwargs)
return self.qa_pipeline(question, retrieved_documents, **kwargs)