feat(loader): implement markdown parsing in MathpixPDFReader (#498)

*  feat(loader): implement markdown parsing in MathpixPDFReader

Add functionality to properly handle PDF content:
- Add parse_markdown_text_to_tables method to separate tables and text
- Fix load_data implementation to properly process documents
- Fix lazy_load_data method
- Improve document metadata handling for tables and text sections

The loader now correctly processes PDFs through Mathpix API and converts content to proper Document objects.

* fix(loader): remove super() calls blocking MathpixPDFReader implementation

Remove early returns using super() in load_data and lazy_load_data methods that were preventing the actual implementation from being executed. This fixes the "not implemented" error while maintaining the full PDF reader functionality.
This commit is contained in:
Elias Judin 2024-12-17 12:30:17 +02:00 committed by GitHub
parent b1e9f98c5a
commit 54320d08df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -2,7 +2,7 @@ import json
import re import re
import time import time
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional from typing import Any, Dict, Generator, List, Optional, Union
import requests import requests
from langchain.utils import get_from_dict_or_env from langchain.utils import get_from_dict_or_env
@ -10,7 +10,7 @@ from llama_index.core.readers.base import BaseReader
from kotaemon.base import Document from kotaemon.base import Document
from .utils.table import parse_markdown_text_to_tables, strip_special_chars_markdown from .utils.table import strip_special_chars_markdown
# MathpixPDFLoader implementation taken largely from Daniel Gross's: # MathpixPDFLoader implementation taken largely from Daniel Gross's:
@ -21,7 +21,7 @@ class MathpixPDFReader(BaseReader):
def __init__( def __init__(
self, self,
processed_file_format: str = "md", processed_file_format: str = "md",
max_wait_time_seconds: int = 500, max_wait_time_seconds: int = 900,
should_clean_pdf: bool = True, should_clean_pdf: bool = True,
**kwargs: Any, **kwargs: Any,
) -> None: ) -> None:
@ -87,22 +87,38 @@ class MathpixPDFReader(BaseReader):
response = requests.get(url, headers=self._mathpix_headers) response = requests.get(url, headers=self._mathpix_headers)
response_data = response.json() response_data = response.json()
status = response_data.get("status", None) status = response_data.get("status", None)
print(
f"Processing status: {status},"
f"Progress: {response_data.get('percent_done', 0)}%"
)
if status == "completed": if status == "completed":
return return
elif status == "error": elif status == "error":
raise ValueError("Unable to retrieve PDF from Mathpix") raise ValueError(f"Mathpix processing error: {response_data}")
else: elif status in [
print(response_data) "split",
print(url) "processing",
]: # Add handling for processing states
time.sleep(5) time.sleep(5)
raise TimeoutError continue
else:
print(f"Unknown status: {response_data}")
time.sleep(5)
raise TimeoutError(
f"Processing did not complete within {self.max_wait_time_seconds} seconds"
)
def get_processed_pdf(self, pdf_id: str) -> str: def get_processed_pdf(self, pdf_id: str) -> str:
self.wait_for_processing(pdf_id) self.wait_for_processing(pdf_id)
url = f"{self.url}/{pdf_id}.{self.processed_file_format}" url = f"{self.url}/{pdf_id}.{self.processed_file_format}"
response = requests.get(url, headers=self._mathpix_headers) response = requests.get(url, headers=self._mathpix_headers)
return response.content.decode("utf-8") if response.status_code != 200:
raise ValueError(f"Failed to get processed PDF: {response.text}")
content = response.content.decode("utf-8")
print(f"Retrieved content length: {len(content)}") # Debug print
return content
def clean_pdf(self, contents: str) -> str: def clean_pdf(self, contents: str) -> str:
"""Clean the PDF file. """Clean the PDF file.
@ -139,26 +155,79 @@ class MathpixPDFReader(BaseReader):
contents = re.sub(markup_regex, "", contents) contents = re.sub(markup_regex, "", contents)
return contents return contents
def parse_markdown_text_to_tables(
self, content: str
) -> tuple[list[tuple[int, str]], list[tuple[int, str]]]:
"""Parse markdown text to get tables and texts separately.
Returns:
Tuple of (tables, texts) where each is a list of (page_num, content) tuples
"""
print("Starting markdown parsing...")
print(f"Content length: {len(content)}")
# Split by page markers if present
pages = re.split(r"(?m)^# Page \d+\n", content)
tables: list[tuple[int, str]] = []
texts: list[tuple[int, str]] = []
for page_num, page_content in enumerate(pages, 1):
if not page_content.strip():
continue
# Extract tables from the page
table_matches = re.findall(r"(\|[^\n]+\|(?:\n\|[^\n]+\|)*)", page_content)
if table_matches:
for table in table_matches:
tables.append(
(page_num, table.strip())
) # Store as tuple with page number
# Remove tables from page content
page_content = re.sub(
r"(\|[^\n]+\|(?:\n\|[^\n]+\|)*)", "", page_content
)
# Split remaining content into meaningful chunks
chunks = re.split(r"\n\s*\n", page_content)
for chunk in chunks:
if chunk.strip():
texts.append(
(page_num, chunk.strip())
) # Store as tuple with page number
print(f"Found {len(tables)} tables and {len(texts)} text sections")
return tables, texts
def load_data( def load_data(
self, file_path: Path, extra_info: Optional[dict] = None, **kwargs self,
file: Union[str, List[str], Path],
extra_info: Optional[Dict] = None,
**load_kwargs: Any,
) -> List[Document]: ) -> List[Document]:
if "response_content" in kwargs: """Load data from file path."""
# overriding response content if specified file_path = Path(file) if isinstance(file, str) else file
content = kwargs["response_content"]
if "response_content" in load_kwargs:
content = load_kwargs["response_content"]
else: else:
# call original API
pdf_id = self.send_pdf(file_path) pdf_id = self.send_pdf(file_path)
content = self.get_processed_pdf(pdf_id) content = self.get_processed_pdf(pdf_id)
if self.should_clean_pdf: if self.should_clean_pdf:
content = self.clean_pdf(content) content = self.clean_pdf(content)
tables, texts = parse_markdown_text_to_tables(content)
tables, texts = self.parse_markdown_text_to_tables(content)
documents = [] documents = []
for table in tables:
text = strip_special_chars_markdown(table) # Handle tables
for page_num, table_content in tables:
text = strip_special_chars_markdown(table_content)
metadata = { metadata = {
"table_origin": table, "table_origin": table_content,
"type": "table", "type": "table",
"page_label": page_num,
"page_number": page_num,
} }
if extra_info: if extra_info:
metadata.update(extra_info) metadata.update(extra_info)
@ -171,8 +240,99 @@ class MathpixPDFReader(BaseReader):
) )
) )
for text in texts: # Handle text sections
metadata = {"source": file_path.name, "type": "text"} for page_num, text_content in texts:
documents.append(Document(text=text, metadata=metadata)) if not text_content.strip():
continue
metadata = {
"source": str(file_path),
"type": "text",
"page_label": page_num,
"page_number": page_num,
}
if extra_info:
metadata.update(extra_info)
documents.append(Document(text=text_content, metadata=metadata))
# Fallback if no content was parsed
if not documents and content.strip():
metadata = {
"source": str(file_path),
"type": "text",
"page_label": 1,
"page_number": 1,
}
if extra_info:
metadata.update(extra_info)
documents.append(Document(text=content.strip(), metadata=metadata))
return documents return documents
def lazy_load_data(
self,
file: Union[str, List[str], Path],
extra_info: Optional[Dict] = None,
**load_kwargs: Any,
) -> Generator[Document, None, None]:
"""Lazy load data from file path."""
file_path = Path(file) if isinstance(file, str) else file
if "response_content" in load_kwargs:
content = load_kwargs["response_content"]
else:
pdf_id = self.send_pdf(file_path)
print(f"PDF ID: {pdf_id}")
content = self.get_processed_pdf(pdf_id)
if self.should_clean_pdf:
content = self.clean_pdf(content)
tables, texts = self.parse_markdown_text_to_tables(content)
# Handle tables
for page_num, table_content in tables: # Changed variable name for clarity
text = strip_special_chars_markdown(table_content) # Pass just the content
metadata = {
"table_origin": table_content, # Use table_content here too
"type": "table",
"page_label": page_num,
"page_number": page_num,
}
if extra_info:
metadata.update(extra_info)
yield Document(
text=text,
metadata=metadata,
metadata_template="",
metadata_seperator="",
)
# Handle text sections
for page_num, text_content in texts: # Changed variable name for clarity
if not text_content.strip():
continue
metadata = {
"source": str(file_path),
"type": "text",
"page_label": page_num,
"page_number": page_num,
}
if extra_info:
metadata.update(extra_info)
yield Document(
text=text_content, metadata=metadata
) # Use text_content directly
# Fallback if no content was parsed
if not (tables or texts) and content.strip():
metadata = {
"source": str(file_path),
"type": "text",
"page_label": 1,
"page_number": 1,
}
if extra_info:
metadata.update(extra_info)
yield Document(text=content.strip(), metadata=metadata)
print(f"Completed processing PDF: {file_path}")