From 4f189dc9313597cd560ee74239e5e59b165be50c Mon Sep 17 00:00:00 2001 From: "Nguyen Trung Duc (john)" Date: Mon, 25 Sep 2023 17:20:03 +0700 Subject: [PATCH] [AUR-408] Export logs to Excel (#23) This CL implements: - The logic to export log to Excel. - Route the export logic in the UI. - Demonstrate this functionality in `./examples/promptui` project. --- knowledgehub/contribs/promptui/export.py | 137 +++++++++++++++++++++++ knowledgehub/contribs/promptui/ui.py | 51 ++++++++- setup.py | 1 + tests/simple_pipeline.py | 43 +++++++ tests/test_promptui.py | 97 +++++++--------- 5 files changed, 265 insertions(+), 64 deletions(-) create mode 100644 tests/simple_pipeline.py diff --git a/knowledgehub/contribs/promptui/export.py b/knowledgehub/contribs/promptui/export.py index 61e8e41..23eec2a 100644 --- a/knowledgehub/contribs/promptui/export.py +++ b/knowledgehub/contribs/promptui/export.py @@ -1 +1,138 @@ """Export logs into Excel file""" +import os +import pickle +from pathlib import Path +from typing import Any, Dict, List, Type, Union + +import pandas as pd +import yaml +from theflow.storage import storage +from theflow.utils.modules import import_dotted_string + +from kotaemon.base import BaseComponent + + +def from_log_to_dict(pipeline_cls: Type[BaseComponent], log_config: dict) -> dict: + """Export the log to panda dataframes + + Args: + pipeline_cls (Type[BaseComponent]): Pipeline class + log_config (dict): Log config + + Returns: + dataframe + """ + # get the directory + pipeline_log_path = storage.url(pipeline_cls().config.store_result) + dirs = list(sorted([f.path for f in os.scandir(pipeline_log_path) if f.is_dir()])) + + ids = [] + params: Dict[str, List[Any]] = {} + inputs: Dict[str, List[Any]] = {} + outputs: Dict[str, List[Any]] = {} + + for idx, each_dir in enumerate(dirs): + ids.append(str(Path(each_dir).name)) + + # get the params + params_file = os.path.join(each_dir, "params.pkl") + if os.path.exists(params_file): + with open(params_file, "rb") as f: + each_params = pickle.load(f) + for key, value in each_params.items(): + if key not in params: + params[key] = [None] * len(dirs) + params[key][idx] = value + + progress_file = os.path.join(each_dir, "progress.pkl") + if os.path.exists(progress_file): + with open(progress_file, "rb") as f: + progress = pickle.load(f) + + # get the inputs + for each_input in log_config["inputs"]: + name = each_input["name"] + step = each_input["step"] + if name not in inputs: + inputs[name] = [None] * len(dirs) + variable = each_input.get("variable", "") + if variable: + inputs[name][idx] = progress[step]["input"]["kwargs"][variable] + else: + inputs[name][idx] = progress[step]["input"] + + # get the outputs + for each_output in log_config["outputs"]: + name = each_output["name"] + step = each_output["step"] + if name not in outputs: + outputs[name] = [None] * len(dirs) + outputs[name][idx] = progress[step]["output"] + if each_output.get("item", ""): + outputs[name][idx] = outputs[name][each_output["item"]] + + return {"ids": ids, **params, **inputs, **outputs} + + +def export(config: dict, pipeline_def, output_path): + """Export from config to Excel file""" + + pipeline_name = f"{pipeline_def.__module__}.{pipeline_def.__name__}" + + # export to Excel + if not config.get("logs", {}): + raise ValueError(f"Pipeline {pipeline_name} has no logs to export") + + pds: Dict[str, pd.DataFrame] = {} + for log_name, log_def in config["logs"].items(): + pds[log_name] = pd.DataFrame(from_log_to_dict(pipeline_def, log_def)) + + # from the list of pds, export to Excel to output_path + with pd.ExcelWriter(output_path, engine="openpyxl") as writer: # type: ignore + for log_name, df in pds.items(): + df.to_excel(writer, sheet_name=log_name) + + +def export_from_dict( + config: Union[str, dict], + pipeline: Union[str, Type[BaseComponent]], + output_path: str, +): + """CLI to export the logs of a pipeline into Excel file + + Args: + config_path (str): Path to the config file + pipeline_name (str): Name of the pipeline + output_path (str): Path to the output Excel file + """ + # get the pipeline class and the relevant config dict + config_dict: dict + if isinstance(config, str): + with open(config) as f: + config_dict = yaml.safe_load(f) + elif isinstance(config, dict): + config_dict = config + else: + raise TypeError(f"`config` must be str or dict, not {type(config)}") + + pipeline_name: str + pipeline_cls: Type[BaseComponent] + pipeline_config: dict + if isinstance(pipeline, str): + if pipeline not in config_dict: + raise ValueError(f"Pipeline {pipeline} not found in config file") + pipeline_name = pipeline + pipeline_cls = import_dotted_string(pipeline, safe=False) + pipeline_config = config_dict[pipeline] + elif isinstance(pipeline, type) and issubclass(pipeline, BaseComponent): + pipeline_name = f"{pipeline.__module__}.{pipeline.__name__}" + if pipeline_name not in config_dict: + raise ValueError(f"Pipeline {pipeline_name} not found in config file") + pipeline_cls = pipeline + pipeline_config = config_dict[pipeline_name] + else: + raise TypeError( + f"`pipeline` must be str or subclass of BaseComponent, not {type(pipeline)}" + ) + + export(pipeline_config, pipeline_cls, output_path) diff --git a/knowledgehub/contribs/promptui/ui.py b/knowledgehub/contribs/promptui/ui.py index 2398a72..cb0fcba 100644 --- a/knowledgehub/contribs/promptui/ui.py +++ b/knowledgehub/contribs/promptui/ui.py @@ -1,13 +1,20 @@ +import pickle +from datetime import datetime +from pathlib import Path from typing import Union import gradio as gr import yaml +from theflow.storage import storage from theflow.utils.modules import import_dotted_string from kotaemon.contribs.promptui.base import COMPONENTS_CLASS, SUPPORTED_COMPONENTS +from kotaemon.contribs.promptui.export import export USAGE_INSTRUCTION = """In case of errors, you can: +- PromptUI instruction: + https://github.com/Cinnamon/kotaemon/wiki/Utilities#prompt-engineering-ui - Create bug fix and make PR at: https://github.com/Cinnamon/kotaemon - Ping any of @john @tadashi @ian @jacky in Slack channel #llm-productization""" @@ -73,6 +80,8 @@ def construct_ui(config, func_run, func_export) -> gr.Blocks: outputs.append(component) + exported_file = gr.File(label="Output file", show_label=True) + temp = gr.Tab with gr.Blocks(analytics_enabled=False, title="Welcome to PromptUI") as demo: with gr.Accordion(label="Usage", open=False): @@ -80,8 +89,10 @@ def construct_ui(config, func_run, func_export) -> gr.Blocks: with gr.Row(): run_btn = gr.Button("Run") run_btn.click(func_run, inputs=inputs + params, outputs=outputs) - export_btn = gr.Button("Export") - export_btn.click(func_export, inputs=None, outputs=None) + export_btn = gr.Button( + "Export (Result will be in Exported file next to Output)" + ) + export_btn.click(func_export, inputs=None, outputs=exported_file) with gr.Row(): with gr.Column(): with temp("Inputs"): @@ -91,8 +102,11 @@ def construct_ui(config, func_run, func_export) -> gr.Blocks: for component in params: component.render() with gr.Column(): - for component in outputs: - component.render() + with temp("Outputs"): + for component in outputs: + component.render() + with temp("Exported file"): + exported_file.render() return demo @@ -103,6 +117,10 @@ def build_pipeline_ui(config: dict, pipeline_def): params_name = list(config.get("params", {}).keys()) outputs_def = config.get("outputs", []) + output_dir: Path = Path(storage.url(pipeline_def().config.store_result)) + exported_dir = output_dir.parent / "exported" + exported_dir.mkdir(parents=True, exist_ok=True) + def run_func(*args): inputs = { name: value for name, value in zip(inputs_name, args[: len(inputs_name)]) @@ -113,6 +131,13 @@ def build_pipeline_ui(config: dict, pipeline_def): pipeline = pipeline_def() pipeline.set(params) pipeline(**inputs) + with storage.open( + storage.url( + pipeline.config.store_result, pipeline.last_run.id(), "params.pkl" + ), + "wb", + ) as f: + pickle.dump(params, f) if outputs_def: outputs = [] for output_def in outputs_def: @@ -122,8 +147,20 @@ def build_pipeline_ui(config: dict, pipeline_def): outputs.append(output) return outputs - # TODO: export_func is None for now - return construct_ui(config, run_func, None) + def export_func(): + name = ( + f"{pipeline_def.__module__}.{pipeline_def.__name__}_{datetime.now()}.xlsx" + ) + path = str(exported_dir / name) + gr.Info(f"Begin exporting {name}...") + try: + export(config=config, pipeline_def=pipeline_def, output_path=path) + except Exception as e: + raise gr.Error(f"Failed to export. Please contact project's AIR: {e}") + gr.Info(f"Exported {name}. Please go to the `Exported file` tab to download") + return path + + return construct_ui(config, run_func, export_func) def build_from_dict(config: Union[str, dict]): @@ -148,4 +185,6 @@ def build_from_dict(config: Union[str, dict]): else: demo = gr.TabbedInterface(demos, list(config_dict.keys())) + demo.queue() + return demo diff --git a/setup.py b/setup.py index 819d363..81bb68a 100644 --- a/setup.py +++ b/setup.py @@ -35,6 +35,7 @@ setuptools.setup( "llama-hub", "nltk", "gradio", + "openpyxl", ], extras_require={ "dev": [ diff --git a/tests/simple_pipeline.py b/tests/simple_pipeline.py new file mode 100644 index 0000000..98e3a37 --- /dev/null +++ b/tests/simple_pipeline.py @@ -0,0 +1,43 @@ +import tempfile +from typing import List + +from theflow import Node + +from kotaemon.base import BaseComponent +from kotaemon.embeddings import AzureOpenAIEmbeddings +from kotaemon.llms.completions.openai import AzureOpenAI +from kotaemon.pipelines.retrieving import RetrieveDocumentFromVectorStorePipeline +from kotaemon.vectorstores import ChromaVectorStore + + +class Pipeline(BaseComponent): + vectorstore_path: str = str(tempfile.mkdtemp()) + llm: Node[AzureOpenAI] = Node( + default=AzureOpenAI, + default_kwargs={ + "openai_api_base": "https://test.openai.azure.com/", + "openai_api_key": "some-key", + "openai_api_version": "2023-03-15-preview", + "deployment_name": "gpt35turbo", + "temperature": 0, + "request_timeout": 60, + }, + ) + + @Node.decorate(depends_on=["vectorstore_path"]) + def retrieving_pipeline(self): + vector_store = ChromaVectorStore(self.vectorstore_path) + embedding = AzureOpenAIEmbeddings( + model="text-embedding-ada-002", + deployment="embedding-deployment", + openai_api_base="https://test.openai.azure.com/", + openai_api_key="some-key", + ) + + return RetrieveDocumentFromVectorStorePipeline( + vector_store=vector_store, embedding=embedding + ) + + def run_raw(self, text: str) -> str: + matched_texts: List[str] = self.retrieving_pipeline(text) + return self.llm("\n".join(matched_texts)).text[0] diff --git a/tests/test_promptui.py b/tests/test_promptui.py index 74468e9..99b0913 100644 --- a/tests/test_promptui.py +++ b/tests/test_promptui.py @@ -1,66 +1,14 @@ -import pytest - from kotaemon.contribs.promptui.config import export_pipeline_to_config +from kotaemon.contribs.promptui.export import export_from_dict from kotaemon.contribs.promptui.ui import build_from_dict - -@pytest.fixture() -def simple_pipeline_cls(tmp_path): - """Create a pipeline class that can be used""" - from typing import List - - from theflow import Node - - from kotaemon.base import BaseComponent - from kotaemon.embeddings import AzureOpenAIEmbeddings - from kotaemon.llms.completions.openai import AzureOpenAI - from kotaemon.pipelines.retrieving import ( - RetrieveDocumentFromVectorStorePipeline, - ) - from kotaemon.vectorstores import ChromaVectorStore - - class Pipeline(BaseComponent): - vectorstore_path: str = str(tmp_path) - llm: Node[AzureOpenAI] = Node( - default=AzureOpenAI, - default_kwargs={ - "openai_api_base": "https://test.openai.azure.com/", - "openai_api_key": "some-key", - "openai_api_version": "2023-03-15-preview", - "deployment_name": "gpt35turbo", - "temperature": 0, - "request_timeout": 60, - }, - ) - - @Node.decorate(depends_on=["vectorstore_path"]) - def retrieving_pipeline(self): - vector_store = ChromaVectorStore(self.vectorstore_path) - embedding = AzureOpenAIEmbeddings( - model="text-embedding-ada-002", - deployment="embedding-deployment", - openai_api_base="https://test.openai.azure.com/", - openai_api_key="some-key", - ) - - return RetrieveDocumentFromVectorStorePipeline( - vector_store=vector_store, embedding=embedding - ) - - def run_raw(self, text: str) -> str: - matched_texts: List[str] = self.retrieving_pipeline(text) - return self.llm("\n".join(matched_texts)).text[0] - - return Pipeline - - -Pipeline = simple_pipeline_cls +from .simple_pipeline import Pipeline class TestPromptConfig: - def test_export_prompt_config(self, simple_pipeline_cls): + def test_export_prompt_config(self): """Test if the prompt config is exported correctly""" - pipeline = simple_pipeline_cls() + pipeline = Pipeline() config_dict = export_pipeline_to_config(pipeline) config = list(config_dict.values())[0] @@ -78,9 +26,42 @@ class TestPromptConfig: class TestPromptUI: - def test_uigeneration(self, simple_pipeline_cls): + def test_uigeneration(self): """Test if the gradio UI is exposed without any problem""" - pipeline = simple_pipeline_cls() + pipeline = Pipeline() config = export_pipeline_to_config(pipeline) build_from_dict(config) + + +class TestExport: + def test_export(self, tmp_path): + """Test if the export functionality works without error""" + from pathlib import Path + + import yaml + from theflow.storage import storage + + config_path = tmp_path / "config.yaml" + pipeline = Pipeline() + Path(storage.url(pipeline.config.store_result)).mkdir( + parents=True, exist_ok=True + ) + + config_dict = export_pipeline_to_config(pipeline) + pipeline_name = list(config_dict.keys())[0] + + config_dict[pipeline_name]["logs"] = { + "sheet1": { + "inputs": [{"name": "text", "step": ".", "variable": "text"}], + "outputs": [{"name": "answer", "step": "."}], + }, + } + with open(config_path, "w") as f: + yaml.safe_dump(config_dict, f) + + export_from_dict( + config=str(config_path), + pipeline=pipeline_name, + output_path=str(tmp_path / "exported.xlsx"), + )