* add OCRReader, MathPixReader and ExcelReader * update test case for ocr reader * reformat * minor fix
176 lines
5.8 KiB
Python
176 lines
5.8 KiB
Python
import json
|
|
import re
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List
|
|
|
|
import requests
|
|
from langchain.utils import get_from_dict_or_env
|
|
from llama_index.readers.base import BaseReader
|
|
|
|
from kotaemon.documents import Document
|
|
|
|
from .utils.table import parse_markdown_text_to_tables, strip_special_chars_markdown
|
|
|
|
|
|
# MathpixPDFLoader implementation taken largely from Daniel Gross's:
|
|
# https://gist.github.com/danielgross/3ab4104e14faccc12b49200843adab21
|
|
class MathpixPDFReader(BaseReader):
|
|
"""Load `PDF` files using `Mathpix` service."""
|
|
|
|
def __init__(
|
|
self,
|
|
processed_file_format: str = "md",
|
|
max_wait_time_seconds: int = 500,
|
|
should_clean_pdf: bool = True,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
"""Initialize with a file path.
|
|
|
|
Args:
|
|
processed_file_format: a format of the processed file. Default is "mmd".
|
|
max_wait_time_seconds: a maximum time to wait for the response from
|
|
the server. Default is 500.
|
|
should_clean_pdf: a flag to clean the PDF file. Default is False.
|
|
**kwargs: additional keyword arguments.
|
|
"""
|
|
self.mathpix_api_key = get_from_dict_or_env(
|
|
kwargs, "mathpix_api_key", "MATHPIX_API_KEY", default="empty"
|
|
)
|
|
self.mathpix_api_id = get_from_dict_or_env(
|
|
kwargs, "mathpix_api_id", "MATHPIX_API_ID", default="empty"
|
|
)
|
|
self.processed_file_format = processed_file_format
|
|
self.max_wait_time_seconds = max_wait_time_seconds
|
|
self.should_clean_pdf = should_clean_pdf
|
|
super().__init__()
|
|
|
|
@property
|
|
def _mathpix_headers(self) -> Dict[str, str]:
|
|
return {"app_id": self.mathpix_api_id, "app_key": self.mathpix_api_key}
|
|
|
|
@property
|
|
def url(self) -> str:
|
|
return "https://api.mathpix.com/v3/pdf"
|
|
|
|
@property
|
|
def data(self) -> dict:
|
|
options = {
|
|
"conversion_formats": {self.processed_file_format: True},
|
|
"enable_tables_fallback": True,
|
|
}
|
|
return {"options_json": json.dumps(options)}
|
|
|
|
def send_pdf(self, file_path) -> str:
|
|
with open(file_path, "rb") as f:
|
|
files = {"file": f}
|
|
response = requests.post(
|
|
self.url, headers=self._mathpix_headers, files=files, data=self.data
|
|
)
|
|
response_data = response.json()
|
|
if "pdf_id" in response_data:
|
|
pdf_id = response_data["pdf_id"]
|
|
return pdf_id
|
|
else:
|
|
raise ValueError("Unable to send PDF to Mathpix.")
|
|
|
|
def wait_for_processing(self, pdf_id: str) -> None:
|
|
"""Wait for processing to complete.
|
|
|
|
Args:
|
|
pdf_id: a PDF id.
|
|
|
|
Returns: None
|
|
"""
|
|
url = self.url + "/" + pdf_id
|
|
for _ in range(0, self.max_wait_time_seconds, 5):
|
|
response = requests.get(url, headers=self._mathpix_headers)
|
|
response_data = response.json()
|
|
status = response_data.get("status", None)
|
|
|
|
if status == "completed":
|
|
return
|
|
elif status == "error":
|
|
raise ValueError("Unable to retrieve PDF from Mathpix")
|
|
else:
|
|
print(response_data)
|
|
print(url)
|
|
time.sleep(5)
|
|
raise TimeoutError
|
|
|
|
def get_processed_pdf(self, pdf_id: str) -> str:
|
|
self.wait_for_processing(pdf_id)
|
|
url = f"{self.url}/{pdf_id}.{self.processed_file_format}"
|
|
response = requests.get(url, headers=self._mathpix_headers)
|
|
return response.content.decode("utf-8")
|
|
|
|
def clean_pdf(self, contents: str) -> str:
|
|
"""Clean the PDF file.
|
|
|
|
Args:
|
|
contents: a PDF file contents.
|
|
|
|
Returns:
|
|
|
|
"""
|
|
contents = "\n".join(
|
|
[line for line in contents.split("\n") if not line.startswith("![]")]
|
|
)
|
|
# replace \section{Title} with # Title
|
|
contents = contents.replace("\\section{", "# ")
|
|
# replace the "\" slash that Mathpix adds to escape $, %, (, etc.
|
|
|
|
# http:// or https:// followed by anything but a closing paren
|
|
url_regex = "http[s]?://[^)]+"
|
|
markup_regex = r"\[]\(\s*({0})\s*\)".format(url_regex)
|
|
contents = (
|
|
contents.replace(r"\$", "$")
|
|
.replace(r"\%", "%")
|
|
.replace(r"\(", "(")
|
|
.replace(r"\)", ")")
|
|
.replace("$\\begin{array}", "")
|
|
.replace("\\end{array}$", "")
|
|
.replace("\\\\", "")
|
|
.replace("\\text", "")
|
|
.replace("}", "")
|
|
.replace("{", "")
|
|
.replace("\\mathrm", "")
|
|
)
|
|
contents = re.sub(markup_regex, "", contents)
|
|
return contents
|
|
|
|
def load_data(self, file_path: Path, **kwargs) -> List[Document]:
|
|
if "response_content" in kwargs:
|
|
# overriding response content if specified
|
|
content = kwargs["response_content"]
|
|
else:
|
|
# call original API
|
|
pdf_id = self.send_pdf(file_path)
|
|
content = self.get_processed_pdf(pdf_id)
|
|
|
|
if self.should_clean_pdf:
|
|
content = self.clean_pdf(content)
|
|
tables, texts = parse_markdown_text_to_tables(content)
|
|
documents = []
|
|
for table in tables:
|
|
text = strip_special_chars_markdown(table)
|
|
metadata = {
|
|
"source": file_path.name,
|
|
"table_origin": table,
|
|
"type": "table",
|
|
}
|
|
documents.append(
|
|
Document(
|
|
text=text,
|
|
metadata=metadata,
|
|
metadata_template="",
|
|
metadata_seperator="",
|
|
)
|
|
)
|
|
|
|
for text in texts:
|
|
metadata = {"source": file_path.name, "type": "text"}
|
|
documents.append(Document(text=text, metadata=metadata))
|
|
|
|
return documents
|