diff --git a/libs/kotaemon/kotaemon/indices/ingests/files.py b/libs/kotaemon/kotaemon/indices/ingests/files.py index 6d8cda0..d82bc16 100644 --- a/libs/kotaemon/kotaemon/indices/ingests/files.py +++ b/libs/kotaemon/kotaemon/indices/ingests/files.py @@ -21,8 +21,10 @@ from kotaemon.loaders import ( PDFThumbnailReader, TxtReader, UnstructuredReader, + WebReader, ) +web_reader = WebReader() unstructured = UnstructuredReader() adobe_reader = AdobeReader() azure_reader = AzureAIDocumentIntelligenceLoader( diff --git a/libs/kotaemon/kotaemon/loaders/__init__.py b/libs/kotaemon/kotaemon/loaders/__init__.py index 624a76b..0c3fbcb 100644 --- a/libs/kotaemon/kotaemon/loaders/__init__.py +++ b/libs/kotaemon/kotaemon/loaders/__init__.py @@ -10,6 +10,7 @@ from .ocr_loader import ImageReader, OCRReader from .pdf_loader import PDFThumbnailReader from .txt_loader import TxtReader from .unstructured_loader import UnstructuredReader +from .web_loader import WebReader __all__ = [ "AutoReader", @@ -28,4 +29,5 @@ __all__ = [ "AdobeReader", "TxtReader", "PDFThumbnailReader", + "WebReader", ] diff --git a/libs/kotaemon/kotaemon/loaders/web_loader.py b/libs/kotaemon/kotaemon/loaders/web_loader.py new file mode 100644 index 0000000..fc73aa6 --- /dev/null +++ b/libs/kotaemon/kotaemon/loaders/web_loader.py @@ -0,0 +1,43 @@ +from pathlib import Path +from typing import Optional + +import requests +from decouple import config + +from kotaemon.base import Document + +from .base import BaseReader + +JINA_API_KEY = config("JINA_API_KEY", default="") +JINA_URL = config("JINA_URL", default="https://r.jina.ai/") + + +class WebReader(BaseReader): + def run( + self, file_path: str | Path, extra_info: Optional[dict] = None, **kwargs + ) -> list[Document]: + return self.load_data(Path(file_path), extra_info=extra_info, **kwargs) + + def fetch_url(self, url: str): + # setup the request + api_url = f"https://r.jina.ai/{url}" + headers = { + "X-With-Links-Summary": "true", + } + if JINA_API_KEY: + headers["Authorization"] = f"Bearer {JINA_API_KEY}" + + response = requests.get(api_url, headers=headers) + response.raise_for_status() + + data = response.text + return data + + def load_data( + self, file_path: str | Path, extra_info: Optional[dict] = None, **kwargs + ) -> list[Document]: + file_path = str(file_path) + output = self.fetch_url(file_path) + metadata = extra_info or {} + + return [Document(text=output, metadata=metadata)] diff --git a/libs/ktem/ktem/index/file/graph/pipelines.py b/libs/ktem/ktem/index/file/graph/pipelines.py index 15bb143..4d356a6 100644 --- a/libs/ktem/ktem/index/file/graph/pipelines.py +++ b/libs/ktem/ktem/index/file/graph/pipelines.py @@ -57,7 +57,7 @@ def prepare_graph_index_path(graph_id: str): class GraphRAGIndexingPipeline(IndexDocumentPipeline): """GraphRAG specific indexing pipeline""" - def route(self, file_path: Path) -> IndexPipeline: + def route(self, file_path: str | Path) -> IndexPipeline: """Simply disable the splitter (chunking) for this pipeline""" pipeline = super().route(file_path) pipeline.splitter = None diff --git a/libs/ktem/ktem/index/file/knet/pipelines.py b/libs/ktem/ktem/index/file/knet/pipelines.py index 9741e5a..f49bdb1 100644 --- a/libs/ktem/ktem/index/file/knet/pipelines.py +++ b/libs/ktem/ktem/index/file/knet/pipelines.py @@ -32,7 +32,7 @@ class KnetIndexingPipeline(IndexDocumentPipeline): }, } - def route(self, file_path: Path) -> IndexPipeline: + def route(self, file_path: str | Path) -> IndexPipeline: """Simply disable the splitter (chunking) for this pipeline""" pipeline = super().route(file_path) pipeline.splitter = None diff --git a/libs/ktem/ktem/index/file/pipelines.py b/libs/ktem/ktem/index/file/pipelines.py index cd94852..b28e243 100644 --- a/libs/ktem/ktem/index/file/pipelines.py +++ b/libs/ktem/ktem/index/file/pipelines.py @@ -39,6 +39,7 @@ from kotaemon.indices.ingests.files import ( adobe_reader, azure_reader, unstructured, + web_reader, ) from kotaemon.indices.rankings import BaseReranking, LLMReranking, LLMTrulensScoring from kotaemon.indices.splitters import BaseSplitter, TokenSplitter @@ -444,7 +445,7 @@ class IndexPipeline(BaseComponent): session.add_all(nodes) session.commit() - def get_id_if_exists(self, file_path: Path) -> Optional[str]: + def get_id_if_exists(self, file_path: str | Path) -> Optional[str]: """Check if the file is already indexed Args: @@ -453,13 +454,14 @@ class IndexPipeline(BaseComponent): Returns: the file id if the file is indexed, otherwise None """ + file_name = file_path.name if isinstance(file_path, Path) else file_path if self.private: cond: tuple = ( - self.Source.name == file_path.name, + self.Source.name == file_name, self.Source.user == self.user_id, ) else: - cond = (self.Source.name == file_path.name,) + cond = (self.Source.name == file_name,) with Session(engine) as session: stmt = select(self.Source).where(*cond) @@ -469,6 +471,29 @@ class IndexPipeline(BaseComponent): return None + def store_url(self, url: str) -> str: + """Store URL into the database and storage, return the file id + + Args: + url: the URL + + Returns: + the file id + """ + file_hash = sha256(url.encode()).hexdigest() + source = self.Source( + name=url, + path=file_hash, + size=0, + user=self.user_id, # type: ignore + ) + with Session(engine) as session: + session.add(source) + session.commit() + file_id = source.id + + return file_id + def store_file(self, file_path: Path) -> str: """Store file into the database and storage, return the file id @@ -495,7 +520,7 @@ class IndexPipeline(BaseComponent): return file_id - def finish(self, file_id: str, file_path: Path) -> str: + def finish(self, file_id: str, file_path: str | Path) -> str: """Finish the indexing""" with Session(engine) as session: stmt = select(self.Source).where(self.Source.id == file_id) @@ -561,37 +586,55 @@ class IndexPipeline(BaseComponent): def stream( self, file_path: str | Path, reindex: bool, **kwargs ) -> Generator[Document, None, tuple[str, list[Document]]]: - # check for duplication - file_path = Path(file_path).resolve() + # check if the file is already indexed + if isinstance(file_path, Path): + file_path = file_path.resolve() + file_id = self.get_id_if_exists(file_path) - if file_id is not None: - if not reindex: - raise ValueError( - f"File {file_path.name} already indexed. Please rerun with " - "reindex=True to force reindexing." - ) + + if isinstance(file_path, Path): + if file_id is not None: + if not reindex: + raise ValueError( + f"File {file_path.name} already indexed. Please rerun with " + "reindex=True to force reindexing." + ) + else: + # remove the existing records + yield Document( + f" => Removing old {file_path.name}", channel="debug" + ) + self.delete_file(file_id) + file_id = self.store_file(file_path) else: - # remove the existing records - yield Document(f" => Removing old {file_path.name}", channel="debug") - self.delete_file(file_id) + # add record to db file_id = self.store_file(file_path) else: - # add record to db - file_id = self.store_file(file_path) + if file_id is not None: + raise ValueError(f"URL {file_path} already indexed.") + else: + # add record to db + file_id = self.store_url(file_path) # extract the file - extra_info = default_file_metadata_func(str(file_path)) + if isinstance(file_path, Path): + extra_info = default_file_metadata_func(str(file_path)) + file_name = file_path.name + else: + extra_info = {"file_name": file_path} + file_name = file_path + extra_info["file_id"] = file_id extra_info["collection_name"] = self.collection_name - yield Document(f" => Converting {file_path.name} to text", channel="debug") + yield Document(f" => Converting {file_name} to text", channel="debug") docs = self.loader.load_data(file_path, extra_info=extra_info) - yield Document(f" => Converted {file_path.name} to text", channel="debug") - yield from self.handle_docs(docs, file_id, file_path.name) + yield Document(f" => Converted {file_name} to text", channel="debug") + yield from self.handle_docs(docs, file_id, file_name) self.finish(file_id, file_path) - yield Document(f" => Finished indexing {file_path.name}", channel="debug") + yield Document(f" => Finished indexing {file_name}", channel="debug") return file_id, docs @@ -658,20 +701,30 @@ class IndexDocumentPipeline(BaseFileIndexIndexing): ) return obj - def route(self, file_path: Path) -> IndexPipeline: + def is_url(self, file_path: str | Path) -> bool: + return isinstance(file_path, str) and ( + file_path.startswith("http://") or file_path.startswith("https://") + ) + + def route(self, file_path: str | Path) -> IndexPipeline: """Decide the pipeline based on the file type Can subclass this method for a more elaborate pipeline routing strategy. """ _, chunk_size, chunk_overlap = dev_settings() - ext = file_path.suffix.lower() - reader = self.readers.get(ext, unstructured) - if reader is None: - raise NotImplementedError( - f"No supported pipeline to index {file_path.name}. Please specify " - "the suitable pipeline for this file type in the settings." - ) + # check if file_path is a URL + if self.is_url(file_path): + reader = web_reader + else: + assert isinstance(file_path, Path) + ext = file_path.suffix.lower() + reader = self.readers.get(ext, unstructured) + if reader is None: + raise NotImplementedError( + f"No supported pipeline to index {file_path.name}. Please specify " + "the suitable pipeline for this file type in the settings." + ) print("Using reader", reader) pipeline: IndexPipeline = IndexPipeline( @@ -715,9 +768,14 @@ class IndexDocumentPipeline(BaseFileIndexIndexing): n_files = len(file_paths) for idx, file_path in enumerate(file_paths): - file_path = Path(file_path) + if self.is_url(file_path): + file_name = file_path + else: + file_path = Path(file_path) + file_name = file_path.name + yield Document( - content=f"Indexing [{idx + 1}/{n_files}]: {file_path.name}", + content=f"Indexing [{idx + 1}/{n_files}]: {file_name}", channel="debug", ) @@ -730,7 +788,11 @@ class IndexDocumentPipeline(BaseFileIndexIndexing): file_ids.append(file_id) errors.append(None) yield Document( - content={"file_path": file_path, "status": "success"}, + content={ + "file_path": file_path, + "file_name": file_name, + "status": "success", + }, channel="index", ) except Exception as e: @@ -740,6 +802,7 @@ class IndexDocumentPipeline(BaseFileIndexIndexing): yield Document( content={ "file_path": file_path, + "file_name": file_name, "status": "failed", "message": str(e), }, diff --git a/libs/ktem/ktem/index/file/ui.py b/libs/ktem/ktem/index/file/ui.py index 9858e46..654bdf1 100644 --- a/libs/ktem/ktem/index/file/ui.py +++ b/libs/ktem/ktem/index/file/ui.py @@ -111,18 +111,25 @@ class FileIndexPage(BasePage): """Build the UI of the app""" with gr.Row(): with gr.Column(scale=1): - gr.Markdown("## File Upload") with gr.Column() as self.upload: - self.files = File( - file_types=self._supported_file_types, - file_count="multiple", - container=True, - show_label=False, - ) + with gr.Tab("Upload Files"): + self.files = File( + file_types=self._supported_file_types, + file_count="multiple", + container=True, + show_label=False, + ) - msg = self.upload_instruction() - if msg: - gr.Markdown(msg) + msg = self.upload_instruction() + if msg: + gr.Markdown(msg) + + with gr.Tab("Use Web Links"): + self.urls = gr.Textbox( + label="Input web URLs", + lines=8, + ) + gr.Markdown("(separated by new line)") with gr.Accordion("Advanced indexing options", open=True): with gr.Row(): @@ -525,6 +532,7 @@ class FileIndexPage(BasePage): fn=self.index_fn, inputs=[ self.files, + self.urls, self.reindex, self._app.settings_state, self._app.user_id, @@ -670,28 +678,33 @@ class FileIndexPage(BasePage): return remaining_files def index_fn( - self, files, reindex: bool, settings, user_id + self, files, urls, reindex: bool, settings, user_id ) -> Generator[tuple[str, str], None, None]: """Upload and index the files Args: files: the list of files to be uploaded + urls: list of web URLs to be indexed reindex: whether to reindex the files selected_files: the list of files already selected settings: the settings of the app """ - if not files: - gr.Info("No uploaded file") - yield "", "" - return + if urls: + files = [it.strip() for it in urls.split("\n")] + errors = [] + else: + if not files: + gr.Info("No uploaded file") + yield "", "" + return - files = self._may_extract_zip(files, flowsettings.KH_ZIP_INPUT_DIR) + files = self._may_extract_zip(files, flowsettings.KH_ZIP_INPUT_DIR) - errors = self.validate(files) - if errors: - gr.Warning(", ".join(errors)) - yield "", "" - return + errors = self.validate(files) + if errors: + gr.Warning(", ".join(errors)) + yield "", "" + return gr.Info(f"Start indexing {len(files)} files...") @@ -708,10 +721,10 @@ class FileIndexPage(BasePage): continue if response.channel == "index": if response.content["status"] == "success": - outputs.append(f"\u2705 | {response.content['file_path'].name}") + outputs.append(f"\u2705 | {response.content['file_name']}") elif response.content["status"] == "failed": outputs.append( - f"\u274c | {response.content['file_path'].name}: " + f"\u274c | {response.content['file_name']}: " f"{response.content['message']}" ) elif response.channel == "debug": @@ -764,7 +777,7 @@ class FileIndexPage(BasePage): settings[f"index.options.{self._index.id}.reader_mode"] = "default" settings[f"index.options.{self._index.id}.quick_index_mode"] = True if to_process_files: - _iter = self.index_fn(to_process_files, reindex, settings, user_id) + _iter = self.index_fn(to_process_files, [], reindex, settings, user_id) try: while next(_iter): pass @@ -844,7 +857,7 @@ class FileIndexPage(BasePage): for p in exclude_patterns: files = [f for f in files if not fnmatch.fnmatch(name=f, pat=p)] - yield from self.index_fn(files, reindex, settings, user_id) + yield from self.index_fn(files, [], reindex, settings, user_id) def format_size_human_readable(self, num: float | str, suffix="B"): try: