Fix integrating indexing and retrieval pipelines to FileIndex (#155)

* Add docs for settings
* Add mdx_truly_sane_lists to doc requirements
This commit is contained in:
Duc Nguyen (john)
2024-03-10 16:41:42 +07:00
committed by GitHub
parent 2b3571e892
commit cb01d27d19
10 changed files with 167 additions and 35 deletions

View File

@@ -36,6 +36,7 @@ class BaseFileIndexRetriever(BaseComponent):
self._Index = resources["Index"]
self._VS = resources["VectorStore"]
self._DS = resources["DocStore"]
self._fs_path = resources["FileStoragePath"]
class BaseFileIndexIndexing(BaseComponent):
@@ -89,3 +90,40 @@ class BaseFileIndexIndexing(BaseComponent):
self._Index = resources["Index"]
self._VS = resources["VectorStore"]
self._DS = resources["DocStore"]
self._fs_path = resources["FileStoragePath"]
def copy_to_filestorage(
self, file_paths: str | Path | list[str | Path]
) -> list[str]:
"""Copy to file storage and return the new path, relative to the file storage
Args:
file_path: the file path to copy
Returns:
the new file paths, relative to the file storage
"""
import shutil
from hashlib import sha256
if not isinstance(file_paths, list):
file_paths = [file_paths]
paths = []
for file_path in file_paths:
with open(file_path, "rb") as f:
paths.append(sha256(f.read()).hexdigest())
shutil.copy(file_path, self._fs_path / paths[-1])
return paths
def get_filestorage_path(self, rel_paths: str | list[str]) -> list[str]:
"""Get the file storage path for the relative path
Args:
rel_paths: the relative path to the file storage
Returns:
the absolute file storage path to the file
"""
raise NotImplementedError

View File

@@ -1,7 +1,7 @@
import uuid
from typing import Any, Optional, Type
from ktem.components import get_docstore, get_vectorstore
from ktem.components import filestorage_path, get_docstore, get_vectorstore
from ktem.db.engine import engine
from ktem.index.base import BaseIndex
from sqlalchemy import Column, DateTime, Integer, String
@@ -45,7 +45,7 @@ class FileIndex(BaseIndex):
),
"name": Column(String, unique=True),
"path": Column(String),
"size": Column(Integer),
"size": Column(Integer, default=0),
"text_length": Column(Integer, default=0),
"date_created": Column(
DateTime(timezone=True), server_default=func.now()
@@ -66,11 +66,13 @@ class FileIndex(BaseIndex):
self._db_tables: dict[str, Any] = {"Source": Source, "Index": Index}
self._vs: BaseVectorStore = get_vectorstore(f"index_{self.id}")
self._docstore: BaseDocumentStore = get_docstore(f"index_{self.id}")
self._fs_path = filestorage_path / f"index_{self.id}"
self._resources = {
"Source": Source,
"Index": Index,
"VectorStore": self._vs,
"DocStore": self._docstore,
"FileStoragePath": self._fs_path,
}
self._indexing_pipeline_cls: Type[BaseFileIndexIndexing]
@@ -96,19 +98,19 @@ class FileIndex(BaseIndex):
"""
if "FILE_INDEX_PIPELINE" in self._config:
self._indexing_pipeline_cls = import_dotted_string(
self._config["FILE_INDEX_PIPELINE"]
self._config["FILE_INDEX_PIPELINE"], safe=False
)
return
if hasattr(flowsettings, f"FILE_INDEX_{self.id}_PIPELINE"):
self._indexing_pipeline_cls = import_dotted_string(
getattr(flowsettings, f"FILE_INDEX_{self.id}_PIPELINE")
getattr(flowsettings, f"FILE_INDEX_{self.id}_PIPELINE"), safe=False
)
return
if hasattr(flowsettings, "FILE_INDEX_PIPELINE"):
self._indexing_pipeline_cls = import_dotted_string(
getattr(flowsettings, "FILE_INDEX_PIPELINE")
getattr(flowsettings, "FILE_INDEX_PIPELINE"), safe=False
)
return
@@ -130,14 +132,14 @@ class FileIndex(BaseIndex):
"""
if "FILE_INDEX_RETRIEVER_PIPELINES" in self._config:
self._retriever_pipeline_cls = [
import_dotted_string(each)
import_dotted_string(each, safe=False)
for each in self._config["FILE_INDEX_RETRIEVER_PIPELINES"]
]
return
if hasattr(flowsettings, f"FILE_INDEX_{self.id}_RETRIEVER_PIPELINES"):
self._retriever_pipeline_cls = [
import_dotted_string(each)
import_dotted_string(each, safe=False)
for each in getattr(
flowsettings, f"FILE_INDEX_{self.id}_RETRIEVER_PIPELINES"
)
@@ -146,8 +148,8 @@ class FileIndex(BaseIndex):
if hasattr(flowsettings, "FILE_INDEX_RETRIEVER_PIPELINES"):
self._retriever_pipeline_cls = [
import_dotted_string(each)
for each in getattr(flowsettings, "FILE_INDEX_RETRIEVER_PIPELINE")
import_dotted_string(each, safe=False)
for each in getattr(flowsettings, "FILE_INDEX_RETRIEVER_PIPELINES")
]
return
@@ -165,13 +167,17 @@ class FileIndex(BaseIndex):
"""
self._resources["Source"].metadata.create_all(engine) # type: ignore
self._resources["Index"].metadata.create_all(engine) # type: ignore
self._fs_path.mkdir(parents=True, exist_ok=True)
def on_delete(self):
"""Clean up the index when the user delete it"""
import shutil
self._resources["Source"].__table__.drop(engine) # type: ignore
self._resources["Index"].__table__.drop(engine) # type: ignore
self._vs.drop()
self._docstore.drop()
shutil.rmtree(self._fs_path)
def get_selector_component_ui(self):
return FileSelector(self._app, self)

View File

@@ -279,7 +279,11 @@ class FileIndexPage(BasePage):
# get the pipeline
indexing_pipeline = self._index.get_indexing_pipeline(settings)
output_nodes, _ = indexing_pipeline(files, reindex=reindex)
result = indexing_pipeline(files, reindex=reindex)
if result is None:
gr.Info("Finish indexing")
return
output_nodes, _ = result
gr.Info(f"Finish indexing into {len(output_nodes)} chunks")
# download the file