From 620b2b03ca4d22b16b881755191d27a199521b43 Mon Sep 17 00:00:00 2001 From: "Nguyen Trung Duc (john)" Date: Thu, 14 Sep 2023 14:18:20 +0700 Subject: [PATCH] [AUR-392, AUR-413, AUR-414] Define base vector store, and make use of ChromaVectorStore from llama_index. Indexing and retrieving vectors with vector store (#18) Design the base interface of vector store, and apply it to the Chroma Vector Store (wrapped around llama_index's implementation). Provide the pipelines to populate and retrieve from vector store. --- knowledgehub/embeddings/__init__.py | 4 + knowledgehub/embeddings/base.py | 47 +++++--- knowledgehub/pipelines/indexing.py | 58 ++++++++++ knowledgehub/pipelines/retrieving.py | 58 ++++++++++ knowledgehub/vectorstores/__init__.py | 4 + knowledgehub/vectorstores/base.py | 154 ++++++++++++++++++++++++++ knowledgehub/vectorstores/chroma.py | 56 ++++++++++ setup.py | 1 + tests/test_indexing_retrieval.py | 59 ++++++++++ tests/test_vectorstore.py | 61 ++++++++++ 10 files changed, 486 insertions(+), 16 deletions(-) create mode 100644 knowledgehub/pipelines/indexing.py create mode 100644 knowledgehub/pipelines/retrieving.py create mode 100644 knowledgehub/vectorstores/__init__.py create mode 100644 knowledgehub/vectorstores/base.py create mode 100644 knowledgehub/vectorstores/chroma.py create mode 100644 tests/test_indexing_retrieval.py create mode 100644 tests/test_vectorstore.py diff --git a/knowledgehub/embeddings/__init__.py b/knowledgehub/embeddings/__init__.py index e69de29..4d32d13 100644 --- a/knowledgehub/embeddings/__init__.py +++ b/knowledgehub/embeddings/__init__.py @@ -0,0 +1,4 @@ +from .base import BaseEmbeddings +from .openai import AzureOpenAIEmbeddings, OpenAIEmbeddings + +__all__ = ["BaseEmbeddings", "OpenAIEmbeddings", "AzureOpenAIEmbeddings"] diff --git a/knowledgehub/embeddings/base.py b/knowledgehub/embeddings/base.py index faa1686..c3b2034 100644 --- a/knowledgehub/embeddings/base.py +++ b/knowledgehub/embeddings/base.py @@ -1,3 +1,4 @@ +from abc import abstractmethod from typing import List, Type from langchain.embeddings.base import Embeddings as LCEmbeddings @@ -7,11 +8,37 @@ from ..components import BaseComponent from ..documents.base import Document -class Embeddings(BaseComponent): - ... +class BaseEmbeddings(BaseComponent): + @abstractmethod + def run_raw(self, text: str) -> List[float]: + ... + + @abstractmethod + def run_batch_raw(self, text: List[str]) -> List[List[float]]: + ... + + @abstractmethod + def run_document(self, text: Document) -> List[float]: + ... + + @abstractmethod + def run_batch_document(self, text: List[Document]) -> List[List[float]]: + ... + + def is_document(self, text) -> bool: + if isinstance(text, Document): + return True + elif isinstance(text, List) and isinstance(text[0], Document): + return True + return False + + def is_batch(self, text) -> bool: + if isinstance(text, list): + return True + return False -class LangchainEmbeddings(Embeddings): +class LangchainEmbeddings(BaseEmbeddings): _lc_class: Type[LCEmbeddings] def __init__(self, **params): @@ -46,17 +73,5 @@ class LangchainEmbeddings(Embeddings): def run_document(self, text: Document) -> List[float]: return self.agent.embed_query(text.text) # type: ignore - def run_batch_document(self, text: List[Document]): + def run_batch_document(self, text: List[Document]) -> List[List[float]]: return self.agent.embed_documents([each.text for each in text]) # type: ignore - - def is_document(self, text) -> bool: - if isinstance(text, Document): - return True - elif isinstance(text, List) and isinstance(text[0], Document): - return True - return False - - def is_batch(self, text) -> bool: - if isinstance(text, list): - return True - return False diff --git a/knowledgehub/pipelines/indexing.py b/knowledgehub/pipelines/indexing.py new file mode 100644 index 0000000..86d7cb5 --- /dev/null +++ b/knowledgehub/pipelines/indexing.py @@ -0,0 +1,58 @@ +from typing import List + +from theflow import Node, Param + +from ..components import BaseComponent +from ..documents.base import Document +from ..embeddings import BaseEmbeddings +from ..vectorstores import BaseVectorStore + + +class IndexVectorStoreFromDocumentPipeline(BaseComponent): + """Ingest the document, run through the embedding, and store the embedding in a + vector store. + + This pipeline supports the following set of inputs: + - List of documents + - List of texts + """ + + vector_store: Param[BaseVectorStore] = Param() + embedding: Node[BaseEmbeddings] = Node() + # TODO: populate to document store as well when it's finished + # TODO: refer to llama_index's storage as well + + def run_raw(self, text: str) -> None: + self.vector_store.add([self.embedding(text)]) + + def run_batch_raw(self, text: List[str]) -> None: + self.vector_store.add(self.embedding(text)) + + def run_document(self, text: Document) -> None: + self.vector_store.add([self.embedding(text)]) + + def run_batch_document(self, text: List[Document]) -> None: + self.vector_store.add(self.embedding(text)) + + def is_document(self, text) -> bool: + if isinstance(text, Document): + return True + elif isinstance(text, List) and isinstance(text[0], Document): + return True + return False + + def is_batch(self, text) -> bool: + if isinstance(text, list): + return True + return False + + def persist(self, path: str): + """Save the whole state of the indexing pipeline vector store and all + necessary information to disk + + Args: + path (str): path to save the state + """ + + def load(self, path: str): + """Load all information from disk to an object""" diff --git a/knowledgehub/pipelines/retrieving.py b/knowledgehub/pipelines/retrieving.py new file mode 100644 index 0000000..a5d1606 --- /dev/null +++ b/knowledgehub/pipelines/retrieving.py @@ -0,0 +1,58 @@ +from typing import List + +from theflow import Node, Param + +from ..components import BaseComponent +from ..documents.base import Document +from ..embeddings import BaseEmbeddings +from ..vectorstores import BaseVectorStore + + +class RetrieveDocumentFromVectorStorePipeline(BaseComponent): + """Retrieve list of documents from vector store""" + + vector_store: Param[BaseVectorStore] = Param() + embedding: Node[BaseEmbeddings] = Node() + # TODO: populate to document store as well when it's finished + # TODO: refer to llama_index's storage as well + + def run_raw(self, text: str) -> List[str]: + emb = self.embedding(text) + return self.vector_store.query(embedding=emb)[2] + + def run_batch_raw(self, text: List[str]) -> List[List[str]]: + result = [] + for each_text in text: + emb = self.embedding(each_text) + result.append(self.vector_store.query(embedding=emb)[2]) + return result + + def run_document(self, text: Document) -> List[str]: + return self.run_raw(text.text) + + def run_batch_document(self, text: List[Document]) -> List[List[str]]: + input_text = [each.text for each in text] + return self.run_batch_raw(input_text) + + def is_document(self, text) -> bool: + if isinstance(text, Document): + return True + elif isinstance(text, List) and isinstance(text[0], Document): + return True + return False + + def is_batch(self, text) -> bool: + if isinstance(text, list): + return True + return False + + def persist(self, path: str): + """Save the whole state of the indexing pipeline vector store and all + necessary information to disk + + Args: + path (str): path to save the state + """ + + def load(self, path: str): + """Load all information from disk to an object""" diff --git a/knowledgehub/vectorstores/__init__.py b/knowledgehub/vectorstores/__init__.py new file mode 100644 index 0000000..e6c8f0c --- /dev/null +++ b/knowledgehub/vectorstores/__init__.py @@ -0,0 +1,4 @@ +from .base import BaseVectorStore +from .chroma import ChromaVectorStore + +__all__ = ["BaseVectorStore", "ChromaVectorStore"] diff --git a/knowledgehub/vectorstores/base.py b/knowledgehub/vectorstores/base.py new file mode 100644 index 0000000..3778a15 --- /dev/null +++ b/knowledgehub/vectorstores/base.py @@ -0,0 +1,154 @@ +from abc import ABC, abstractmethod +from typing import Any, List, Optional, Tuple, Type, Union + +from llama_index.vector_stores.types import BasePydanticVectorStore +from llama_index.vector_stores.types import VectorStore as LIVectorStore +from llama_index.vector_stores.types import VectorStoreQuery + +from ..documents.base import Document + + +class BaseVectorStore(ABC): + @abstractmethod + def __init__(self, *args, **kwargs): + ... + + @abstractmethod + def add( + self, + embeddings: List[List[float]], + metadatas: Optional[List[dict]] = None, + ids: Optional[List[str]] = None, + ) -> List[str]: + """Add vector embeddings to vector stores + + Args: + embeddings: List of embeddings + metadatas: List of metadata of the embeddings + ids: List of ids of the embeddings + kwargs: meant for vectorstore-specific parameters + + Returns: + List of ids of the embeddings + """ + ... + + @abstractmethod + def add_from_docs(self, docs: List[Document]): + """Add vector embeddings to vector stores + + Args: + docs: List of Document objects + """ + ... + + @abstractmethod + def delete(self, ids: List[str], **kwargs): + """Delete vector embeddings from vector stores + + Args: + ids: List of ids of the embeddings to be deleted + kwargs: meant for vectorstore-specific parameters + """ + ... + + # @abstractmethod + # def update(self, *args, **kwargs): + # ... + + # @abstractmethod + # def persist(self, *args, **kwargs): + # ... + + # @classmethod + # @abstractmethod + # def load(self, *args, **kwargs): + # ... + + @abstractmethod + def query( + self, + embedding: List[float], + top_k: int = 1, + ids: Optional[List[str]] = None, + ) -> Tuple[List[List[float]], List[float], List[str]]: + """Return the top k most similar vector embeddings + + Args: + embedding: List of embeddings + top_k: Number of most similar embeddings to return + ids: List of ids of the embeddings to be queried + + Returns: + the matched embeddings, the similarity scores, and the ids + """ + ... + + +class LlamaIndexVectorStore(BaseVectorStore): + _li_class: Type[Union[LIVectorStore, BasePydanticVectorStore]] + + def __init__(self, *args, **kwargs): + if self._li_class is None: + raise AttributeError( + "Require `_li_class` to set a VectorStore class from LlamarIndex" + ) + + self._client = self._li_class(*args, **kwargs) + + def __setattr__(self, name: str, value: Any) -> None: + if name.startswith("_"): + return super().__setattr__(name, value) + + return setattr(self._client, name, value) + + def __getattr__(self, name: str) -> Any: + return getattr(self._client, name) + + def add( + self, + embeddings: List[List[float]], + metadatas: Optional[List[dict]] = None, + ids: Optional[List[str]] = None, + ): + nodes = [Document(embedding=embedding) for embedding in embeddings] + if metadatas is not None: + for node, metadata in zip(nodes, metadatas): + node.metadata = metadata + if ids is not None: + for node, id in zip(nodes, ids): + node.id_ = id + + return self._client.add(nodes=nodes) # type: ignore + + def add_from_docs(self, docs: List[Document]): + return self._client.add(nodes=docs) # type: ignore + + def delete(self, ids: List[str], **kwargs): + for id_ in ids: + self._client.delete(ref_doc_id=id_, **kwargs) + + def query( + self, + embedding: List[float], + top_k: int = 1, + ids: Optional[List[str]] = None, + **kwargs, + ) -> Tuple[List[List[float]], List[float], List[str]]: + output = self._client.query( + query=VectorStoreQuery( + query_embedding=embedding, + similarity_top_k=top_k, + node_ids=ids, + ), + **kwargs, + ) + + embeddings = [] + if output.nodes: + for node in output.nodes: + embeddings.append(node.embedding) + similarities = output.similarities if output.similarities else [] + out_ids = output.ids if output.ids else [] + + return embeddings, similarities, out_ids diff --git a/knowledgehub/vectorstores/chroma.py b/knowledgehub/vectorstores/chroma.py new file mode 100644 index 0000000..a990207 --- /dev/null +++ b/knowledgehub/vectorstores/chroma.py @@ -0,0 +1,56 @@ +from typing import Any, Dict, List, Optional, Type, cast + +from llama_index.vector_stores.chroma import ChromaVectorStore as LIChromaVectorStore + +from .base import LlamaIndexVectorStore + + +class ChromaVectorStore(LlamaIndexVectorStore): + _li_class: Type[LIChromaVectorStore] = LIChromaVectorStore + + def __init__( + self, + path: str = "./chroma", + collection_name: str = "default", + host: str = "localhost", + port: str = "8000", + ssl: bool = False, + headers: Optional[Dict[str, str]] = None, + collection_kwargs: Optional[dict] = None, + stores_text: bool = True, + flat_metadata: bool = True, + **kwargs: Any, + ): + try: + import chromadb + except ImportError: + raise ImportError( + "ChromaVectorStore requires chromadb. " + "Please install chromadb first `pip install chromadb`" + ) + + client = chromadb.PersistentClient(path=path) + collection = client.get_or_create_collection(collection_name) + + # pass through for nice IDE support + super().__init__( + chroma_collection=collection, + host=host, + port=port, + ssl=ssl, + headers=headers or {}, + collection_kwargs=collection_kwargs or {}, + stores_text=stores_text, + flat_metadata=flat_metadata, + **kwargs, + ) + self._client = cast(LIChromaVectorStore, self._client) + + def delete(self, ids: List[str], **kwargs): + """Delete vector embeddings from vector stores + + Args: + ids: List of ids of the embeddings to be deleted + kwargs: meant for vectorstore-specific parameters + """ + self._client._collection.delete(ids=ids) diff --git a/setup.py b/setup.py index ef37e8e..91e9b67 100644 --- a/setup.py +++ b/setup.py @@ -46,6 +46,7 @@ setuptools.setup( "coverage", # optional dependency needed for test "openai", + "chromadb", ], }, entry_points={"console_scripts": ["kh=kotaemon.cli:main"]}, diff --git a/tests/test_indexing_retrieval.py b/tests/test_indexing_retrieval.py new file mode 100644 index 0000000..1dc3377 --- /dev/null +++ b/tests/test_indexing_retrieval.py @@ -0,0 +1,59 @@ +import json +from pathlib import Path + +import pytest +from openai.api_resources.embedding import Embedding + +from kotaemon.documents.base import Document +from kotaemon.embeddings.openai import AzureOpenAIEmbeddings +from kotaemon.pipelines.indexing import IndexVectorStoreFromDocumentPipeline +from kotaemon.pipelines.retrieving import RetrieveDocumentFromVectorStorePipeline +from kotaemon.vectorstores import ChromaVectorStore + +with open(Path(__file__).parent / "resources" / "embedding_openai.json") as f: + openai_embedding = json.load(f) + + +@pytest.fixture(scope="function") +def mock_openai_embedding(monkeypatch): + monkeypatch.setattr(Embedding, "create", lambda *args, **kwargs: openai_embedding) + + +def test_indexing(mock_openai_embedding, tmp_path): + db = ChromaVectorStore(path=str(tmp_path)) + embedding = AzureOpenAIEmbeddings( + model="text-embedding-ada-002", + deployment="embedding-deployment", + openai_api_base="https://test.openai.azure.com/", + openai_api_key="some-key", + ) + + pipeline = IndexVectorStoreFromDocumentPipeline( + vector_store=db, embedding=embedding + ) + assert pipeline.vector_store._collection.count() == 0, "Expected empty collection" + pipeline(text=Document(text="Hello world")) + assert pipeline.vector_store._collection.count() == 1, "Index 1 item" + + +def test_retrieving(mock_openai_embedding, tmp_path): + db = ChromaVectorStore(path=str(tmp_path)) + embedding = AzureOpenAIEmbeddings( + model="text-embedding-ada-002", + deployment="embedding-deployment", + openai_api_base="https://test.openai.azure.com/", + openai_api_key="some-key", + ) + + index_pipeline = IndexVectorStoreFromDocumentPipeline( + vector_store=db, embedding=embedding + ) + retrieval_pipeline = RetrieveDocumentFromVectorStorePipeline( + vector_store=db, embedding=embedding + ) + + index_pipeline(text=Document(text="Hello world")) + output = retrieval_pipeline(text=["Hello world", "Hello world"]) + + assert len(output) == 2, "Expected 2 results" + assert output[0] == output[1], "Expected identical results" diff --git a/tests/test_vectorstore.py b/tests/test_vectorstore.py new file mode 100644 index 0000000..96e9ba1 --- /dev/null +++ b/tests/test_vectorstore.py @@ -0,0 +1,61 @@ +from kotaemon.documents.base import Document +from kotaemon.vectorstores import ChromaVectorStore + + +class TestChromaVectorStore: + def test_add(self, tmp_path): + """Test that the DB add correctly""" + db = ChromaVectorStore(path=str(tmp_path)) + + embeddings = [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]] + metadatas = [{"a": 1, "b": 2}, {"a": 3, "b": 4}] + ids = ["1", "2"] + + assert db._collection.count() == 0, "Expected empty collection" + output = db.add(embeddings=embeddings, metadatas=metadatas, ids=ids) + assert output == ids, "Expected output to be the same as ids" + assert db._collection.count() == 2, "Expected 2 added entries" + + def test_add_from_docs(self, tmp_path): + db = ChromaVectorStore(path=str(tmp_path)) + + embeddings = [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]] + metadatas = [{"a": 1, "b": 2}, {"a": 3, "b": 4}] + documents = [ + Document(embedding=embedding, metadata=metadata) + for embedding, metadata in zip(embeddings, metadatas) + ] + assert db._collection.count() == 0, "Expected empty collection" + output = db.add_from_docs(documents) + assert len(output) == 2, "Expected outputing 2 ids" + assert db._collection.count() == 2, "Expected 2 added entries" + + def test_delete(self, tmp_path): + db = ChromaVectorStore(path=str(tmp_path)) + + embeddings = [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6], [0.7, 0.8, 0.9]] + metadatas = [{"a": 1, "b": 2}, {"a": 3, "b": 4}, {"a": 5, "b": 6}] + ids = ["a", "b", "c"] + + db.add(embeddings=embeddings, metadatas=metadatas, ids=ids) + assert db._collection.count() == 3, "Expected 3 added entries" + db.delete(ids=["a", "b"]) + assert db._collection.count() == 1, "Expected 1 remaining entry" + db.delete(ids=["c"]) + assert db._collection.count() == 0, "Expected 0 remaining entry" + + def test_query(self, tmp_path): + db = ChromaVectorStore(path=str(tmp_path)) + + embeddings = [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6], [0.7, 0.8, 0.9]] + metadatas = [{"a": 1, "b": 2}, {"a": 3, "b": 4}, {"a": 5, "b": 6}] + ids = ["a", "b", "c"] + + db.add(embeddings=embeddings, metadatas=metadatas, ids=ids) + + _, sim, out_ids = db.query(embedding=[0.1, 0.2, 0.3], top_k=1) + assert sim == [0.0] + assert out_ids == ["a"] + + _, _, out_ids = db.query(embedding=[0.42, 0.52, 0.53], top_k=1) + assert out_ids == ["b"]