[AUR-338, AUR-406, AUR-407] Export pipeline to config for PromptUI. Construct PromptUI dynamically based on config. (#16)
From pipeline > config > UI. Provide example project for promptui - Pipeline to config: `kotaemon.contribs.promptui.config.export_pipeline_to_config`. The config follows schema specified in this document: https://cinnamon-ai.atlassian.net/wiki/spaces/ATM/pages/2748711193/Technical+Detail. Note: this implementation exclude the logs, which will be handled in AUR-408. - Config to UI: `kotaemon.contribs.promptui.build_from_yaml` - Example project is located at `examples/promptui/`
This commit is contained in:
committed by
GitHub
parent
c329c4c03f
commit
c6dd01e820
20
knowledgehub/contribs/promptui/base.py
Normal file
20
knowledgehub/contribs/promptui/base.py
Normal file
@@ -0,0 +1,20 @@
|
||||
import gradio as gr
|
||||
|
||||
COMPONENTS_CLASS = {
|
||||
"text": gr.components.Textbox,
|
||||
"checkbox": gr.components.CheckboxGroup,
|
||||
"dropdown": gr.components.Dropdown,
|
||||
"file": gr.components.File,
|
||||
"image": gr.components.Image,
|
||||
"number": gr.components.Number,
|
||||
"radio": gr.components.Radio,
|
||||
"slider": gr.components.Slider,
|
||||
}
|
||||
SUPPORTED_COMPONENTS = set(COMPONENTS_CLASS.keys())
|
||||
DEFAULT_COMPONENT_BY_TYPES = {
|
||||
"str": "text",
|
||||
"bool": "checkbox",
|
||||
"int": "number",
|
||||
"float": "number",
|
||||
"list": "dropdown",
|
||||
}
|
@@ -1 +1,132 @@
|
||||
"""Get config from Pipeline"""
|
||||
import inspect
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional, Type, Union
|
||||
|
||||
import yaml
|
||||
|
||||
from ...base import BaseComponent
|
||||
from .base import DEFAULT_COMPONENT_BY_TYPES
|
||||
|
||||
|
||||
def config_from_value(value: Any) -> dict:
|
||||
"""Get the config from default value
|
||||
|
||||
Args:
|
||||
value (Any): default value
|
||||
|
||||
Returns:
|
||||
dict: config
|
||||
"""
|
||||
component = DEFAULT_COMPONENT_BY_TYPES.get(type(value).__name__, "text")
|
||||
return {
|
||||
"component": component,
|
||||
"params": {
|
||||
"value": value,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def handle_param(param: dict) -> dict:
|
||||
"""Convert param definition into promptui-compliant config
|
||||
|
||||
Supported gradio's UI components are (https://www.gradio.app/docs/components)
|
||||
- CheckBoxGroup: list (multi select)
|
||||
- DropDown: list (single select)
|
||||
- File
|
||||
- Image
|
||||
- Number: int / float
|
||||
- Radio: list (single select)
|
||||
- Slider: int / float
|
||||
- TextBox: str
|
||||
"""
|
||||
params = {}
|
||||
default = param.get("default", None)
|
||||
if isinstance(default, str) and default.startswith("{{") and default.endswith("}}"):
|
||||
default = None
|
||||
if default is not None:
|
||||
params["value"] = default
|
||||
|
||||
type_: str = type(default).__name__ if default is not None else ""
|
||||
ui_component = DEFAULT_COMPONENT_BY_TYPES.get(type_, "text")
|
||||
|
||||
return {
|
||||
"component": ui_component,
|
||||
"params": params,
|
||||
}
|
||||
|
||||
|
||||
def handle_node(node: dict) -> dict:
|
||||
"""Convert node definition into promptui-compliant config"""
|
||||
config = {}
|
||||
for name, param_def in node.get("params", {}).items():
|
||||
if isinstance(param_def["default_callback"], str):
|
||||
continue
|
||||
config[name] = handle_param(param_def)
|
||||
for name, node_def in node.get("nodes", {}).items():
|
||||
if isinstance(node_def["default_callback"], str):
|
||||
continue
|
||||
for key, value in handle_node(node_def["default"]).items():
|
||||
config[f"{name}.{key}"] = value
|
||||
for key, value in node_def["default_kwargs"].items():
|
||||
config[f"{name}.{key}"] = config_from_value(value)
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def handle_input(pipeline: Union[BaseComponent, Type[BaseComponent]]) -> dict:
|
||||
"""Get the input from the pipeline"""
|
||||
if not hasattr(pipeline, "run_raw"):
|
||||
return {}
|
||||
signature = inspect.signature(pipeline.run_raw)
|
||||
inputs: Dict[str, Dict] = {}
|
||||
for name, param in signature.parameters.items():
|
||||
if name in ["self", "args", "kwargs"]:
|
||||
continue
|
||||
input_def: Dict[str, Optional[Any]] = {"component": "text"}
|
||||
default = param.default
|
||||
if default is param.empty:
|
||||
inputs[name] = input_def
|
||||
continue
|
||||
|
||||
params = {}
|
||||
params["value"] = default
|
||||
type_ = type(default).__name__ if default is not None else None
|
||||
ui_component = None
|
||||
if type_ is not None:
|
||||
ui_component = "text"
|
||||
|
||||
input_def["component"] = ui_component
|
||||
input_def["params"] = params
|
||||
|
||||
inputs[name] = input_def
|
||||
|
||||
return inputs
|
||||
|
||||
|
||||
def export_pipeline_to_config(
|
||||
pipeline: Union[BaseComponent, Type[BaseComponent]],
|
||||
path: Optional[str] = None,
|
||||
) -> dict:
|
||||
"""Export a pipeline to a promptui-compliant config dict"""
|
||||
if inspect.isclass(pipeline):
|
||||
pipeline = pipeline()
|
||||
|
||||
pipeline_def = pipeline.describe()
|
||||
config = {
|
||||
f"{pipeline.__module__}.{pipeline.__class__.__name__}": {
|
||||
"params": handle_node(pipeline_def),
|
||||
"inputs": handle_input(pipeline),
|
||||
"outputs": [{"step": ".", "component": "text"}],
|
||||
}
|
||||
}
|
||||
if path is not None:
|
||||
old_config = config
|
||||
if Path(path).is_file():
|
||||
with open(path) as f:
|
||||
old_config = yaml.safe_load(f)
|
||||
old_config.update(config)
|
||||
with open(path, "w") as f:
|
||||
yaml.safe_dump(old_config, f)
|
||||
|
||||
return config
|
||||
|
@@ -1,6 +1,151 @@
|
||||
"""Create UI from config file. Execute the UI from config file
|
||||
from typing import Union
|
||||
|
||||
- Can do now: Log from stdout to UI
|
||||
- In the future, we can provide some hooks and callbacks to let developers better
|
||||
fine-tune the UI behavior.
|
||||
"""
|
||||
import gradio as gr
|
||||
import yaml
|
||||
from theflow.utils.modules import import_dotted_string
|
||||
|
||||
from kotaemon.contribs.promptui.base import COMPONENTS_CLASS, SUPPORTED_COMPONENTS
|
||||
|
||||
USAGE_INSTRUCTION = """In case of errors, you can:
|
||||
|
||||
- Create bug fix and make PR at: https://github.com/Cinnamon/kotaemon
|
||||
- Ping any of @john @tadashi @ian @jacky in Slack channel #llm-productization"""
|
||||
|
||||
|
||||
def get_component(component_def: dict) -> gr.components.Component:
|
||||
"""Get the component based on component definition"""
|
||||
component_cls = None
|
||||
|
||||
if "component" in component_def:
|
||||
component = component_def["component"]
|
||||
if component not in SUPPORTED_COMPONENTS:
|
||||
raise ValueError(
|
||||
f"Unsupported UI component: {component}. "
|
||||
f"Must be one of {SUPPORTED_COMPONENTS}"
|
||||
)
|
||||
|
||||
component_cls = COMPONENTS_CLASS[component]
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Cannot decide the component from {component_def}. "
|
||||
"Please specify `component` with 1 of the following "
|
||||
f"values: {SUPPORTED_COMPONENTS}"
|
||||
)
|
||||
|
||||
return component_cls(**component_def.get("params", {}))
|
||||
|
||||
|
||||
def construct_ui(config, func_run, func_export) -> gr.Blocks:
|
||||
"""Create UI from config file. Execute the UI from config file
|
||||
|
||||
- Can do now: Log from stdout to UI
|
||||
- In the future, we can provide some hooks and callbacks to let developers better
|
||||
fine-tune the UI behavior.
|
||||
"""
|
||||
inputs, outputs, params = [], [], []
|
||||
for name, component_def in config.get("inputs", {}).items():
|
||||
if "params" not in component_def:
|
||||
component_def["params"] = {}
|
||||
component_def["params"]["interactive"] = True
|
||||
component = get_component(component_def)
|
||||
if hasattr(component, "label") and not component.label: # type: ignore
|
||||
component.label = name # type: ignore
|
||||
|
||||
inputs.append(component)
|
||||
|
||||
for name, component_def in config.get("params", {}).items():
|
||||
if "params" not in component_def:
|
||||
component_def["params"] = {}
|
||||
component_def["params"]["interactive"] = True
|
||||
component = get_component(component_def)
|
||||
if hasattr(component, "label") and not component.label: # type: ignore
|
||||
component.label = name # type: ignore
|
||||
|
||||
params.append(component)
|
||||
|
||||
for idx, component_def in enumerate(config.get("outputs", [])):
|
||||
if "params" not in component_def:
|
||||
component_def["params"] = {}
|
||||
component_def["params"]["interactive"] = False
|
||||
component = get_component(component_def)
|
||||
if hasattr(component, "label") and not component.label: # type: ignore
|
||||
component.label = f"Output {idx}"
|
||||
|
||||
outputs.append(component)
|
||||
|
||||
temp = gr.Tab
|
||||
with gr.Blocks(analytics_enabled=False, title="Welcome to PromptUI") as demo:
|
||||
with gr.Accordion(label="Usage", open=False):
|
||||
gr.Markdown(USAGE_INSTRUCTION)
|
||||
with gr.Row():
|
||||
run_btn = gr.Button("Run")
|
||||
run_btn.click(func_run, inputs=inputs + params, outputs=outputs)
|
||||
export_btn = gr.Button("Export")
|
||||
export_btn.click(func_export, inputs=None, outputs=None)
|
||||
with gr.Row():
|
||||
with gr.Column():
|
||||
with temp("Inputs"):
|
||||
for component in inputs:
|
||||
component.render()
|
||||
with temp("Params"):
|
||||
for component in params:
|
||||
component.render()
|
||||
with gr.Column():
|
||||
for component in outputs:
|
||||
component.render()
|
||||
|
||||
return demo
|
||||
|
||||
|
||||
def build_pipeline_ui(config: dict, pipeline_def):
|
||||
"""Build a tab from config file"""
|
||||
inputs_name = list(config.get("inputs", {}).keys())
|
||||
params_name = list(config.get("params", {}).keys())
|
||||
outputs_def = config.get("outputs", [])
|
||||
|
||||
def run_func(*args):
|
||||
inputs = {
|
||||
name: value for name, value in zip(inputs_name, args[: len(inputs_name)])
|
||||
}
|
||||
params = {
|
||||
name: value for name, value in zip(params_name, args[len(inputs_name) :])
|
||||
}
|
||||
pipeline = pipeline_def()
|
||||
pipeline.set(params)
|
||||
pipeline(**inputs)
|
||||
if outputs_def:
|
||||
outputs = []
|
||||
for output_def in outputs_def:
|
||||
output = pipeline.last_run.logs(output_def["step"])
|
||||
if "item" in output_def:
|
||||
output = output[output_def["item"]]
|
||||
outputs.append(output)
|
||||
return outputs
|
||||
|
||||
# TODO: export_func is None for now
|
||||
return construct_ui(config, run_func, None)
|
||||
|
||||
|
||||
def build_from_dict(config: Union[str, dict]):
|
||||
"""Build a full UI from YAML config file"""
|
||||
|
||||
if isinstance(config, str):
|
||||
with open(config) as f:
|
||||
config_dict: dict = yaml.safe_load(f)
|
||||
elif isinstance(config, dict):
|
||||
config_dict = config
|
||||
else:
|
||||
raise ValueError(
|
||||
f"config must be either a yaml path or a dict, got {type(config)}"
|
||||
)
|
||||
|
||||
demos = []
|
||||
for key, value in config_dict.items():
|
||||
pipeline_def = import_dotted_string(key, safe=False)
|
||||
demos.append(build_pipeline_ui(value, pipeline_def))
|
||||
if len(demos) == 1:
|
||||
demo = demos[0]
|
||||
else:
|
||||
demo = gr.TabbedInterface(demos, list(config_dict.keys()))
|
||||
|
||||
return demo
|
||||
|
@@ -1,4 +1,4 @@
|
||||
from .base import BaseDocumentStore
|
||||
from .simple import InMemoryDocumentStore
|
||||
from .in_memory import InMemoryDocumentStore
|
||||
|
||||
__all__ = ["BaseDocumentStore", "InMemoryDocumentStore"]
|
||||
|
@@ -10,7 +10,7 @@ class InMemoryDocumentStore(BaseDocumentStore):
|
||||
"""Simple memory document store that store document in a dictionary"""
|
||||
|
||||
def __init__(self):
|
||||
self.store = {}
|
||||
self._store = {}
|
||||
|
||||
def add(
|
||||
self,
|
||||
@@ -32,20 +32,20 @@ class InMemoryDocumentStore(BaseDocumentStore):
|
||||
docs = [docs]
|
||||
|
||||
for doc_id, doc in zip(doc_ids, docs):
|
||||
if doc_id in self.store and not exist_ok:
|
||||
if doc_id in self._store and not exist_ok:
|
||||
raise ValueError(f"Document with id {doc_id} already exist")
|
||||
self.store[doc_id] = doc
|
||||
self._store[doc_id] = doc
|
||||
|
||||
def get(self, ids: Union[List[str], str]) -> List[Document]:
|
||||
"""Get document by id"""
|
||||
if not isinstance(ids, list):
|
||||
ids = [ids]
|
||||
|
||||
return [self.store[doc_id] for doc_id in ids]
|
||||
return [self._store[doc_id] for doc_id in ids]
|
||||
|
||||
def get_all(self) -> dict:
|
||||
"""Get all documents"""
|
||||
return self.store
|
||||
return self._store
|
||||
|
||||
def delete(self, ids: Union[List[str], str]):
|
||||
"""Delete document by id"""
|
||||
@@ -53,11 +53,11 @@ class InMemoryDocumentStore(BaseDocumentStore):
|
||||
ids = [ids]
|
||||
|
||||
for doc_id in ids:
|
||||
del self.store[doc_id]
|
||||
del self._store[doc_id]
|
||||
|
||||
def save(self, path: Union[str, Path]):
|
||||
"""Save document to path"""
|
||||
store = {key: value.to_dict() for key, value in self.store.items()}
|
||||
store = {key: value.to_dict() for key, value in self._store.items()}
|
||||
with open(path, "w") as f:
|
||||
json.dump(store, f)
|
||||
|
||||
@@ -65,4 +65,4 @@ class InMemoryDocumentStore(BaseDocumentStore):
|
||||
"""Load document store from path"""
|
||||
with open(path) as f:
|
||||
store = json.load(f)
|
||||
self.store = {key: Document.from_dict(value) for key, value in store.items()}
|
||||
self._store = {key: Document.from_dict(value) for key, value in store.items()}
|
@@ -1,4 +1,5 @@
|
||||
from haystack.schema import Document as HaystackDocument
|
||||
from llama_index.bridge.pydantic import Field
|
||||
from llama_index.schema import Document as BaseDocument
|
||||
|
||||
SAMPLE_TEXT = "A sample Document from kotaemon"
|
||||
@@ -20,3 +21,17 @@ class Document(BaseDocument):
|
||||
metadata = self.metadata or {}
|
||||
text = self.text
|
||||
return HaystackDocument(content=text, meta=metadata)
|
||||
|
||||
|
||||
class RetrievedDocument(Document):
|
||||
"""Subclass of Document with retrieval-related information
|
||||
|
||||
Attributes:
|
||||
score (float): score of the document (from 0.0 to 1.0)
|
||||
retrieval_metadata (dict): metadata from the retrieval process, can be used
|
||||
by different components in a retrieved pipeline to communicate with each
|
||||
other
|
||||
"""
|
||||
|
||||
score: float = Field(default=0.0)
|
||||
retrieval_metadata: dict = Field(default={})
|
||||
|
@@ -27,7 +27,7 @@ class LangchainLLM(LLM):
|
||||
self._kwargs[param] = params.pop(param)
|
||||
super().__init__(**params)
|
||||
|
||||
@Param.decorate()
|
||||
@Param.decorate(no_cache=True)
|
||||
def agent(self):
|
||||
return self._lc_class(**self._kwargs)
|
||||
|
||||
|
@@ -1,8 +1,10 @@
|
||||
from typing import List
|
||||
import uuid
|
||||
from typing import List, Optional
|
||||
|
||||
from theflow import Node, Param
|
||||
|
||||
from ..base import BaseComponent
|
||||
from ..docstores import BaseDocumentStore
|
||||
from ..documents.base import Document
|
||||
from ..embeddings import BaseEmbeddings
|
||||
from ..vectorstores import BaseVectorStore
|
||||
@@ -18,21 +20,30 @@ class IndexVectorStoreFromDocumentPipeline(BaseComponent):
|
||||
"""
|
||||
|
||||
vector_store: Param[BaseVectorStore] = Param()
|
||||
doc_store: Optional[BaseDocumentStore] = None
|
||||
embedding: Node[BaseEmbeddings] = Node()
|
||||
# TODO: populate to document store as well when it's finished
|
||||
|
||||
# TODO: refer to llama_index's storage as well
|
||||
|
||||
def run_raw(self, text: str) -> None:
|
||||
self.vector_store.add([self.embedding(text)])
|
||||
document = Document(text=text, id_=str(uuid.uuid4()))
|
||||
self.run_batch_document([document])
|
||||
|
||||
def run_batch_raw(self, text: List[str]) -> None:
|
||||
self.vector_store.add(self.embedding(text))
|
||||
documents = [Document(t, id_=str(uuid.uuid4())) for t in text]
|
||||
self.run_batch_document(documents)
|
||||
|
||||
def run_document(self, text: Document) -> None:
|
||||
self.vector_store.add([self.embedding(text)])
|
||||
self.run_batch_document([text])
|
||||
|
||||
def run_batch_document(self, text: List[Document]) -> None:
|
||||
self.vector_store.add(self.embedding(text))
|
||||
embeddings = self.embedding(text)
|
||||
self.vector_store.add(
|
||||
embeddings=embeddings,
|
||||
ids=[t.id_ for t in text],
|
||||
)
|
||||
if self.doc_store:
|
||||
self.doc_store.add(text)
|
||||
|
||||
def is_document(self, text) -> bool:
|
||||
if isinstance(text, Document):
|
||||
|
@@ -1,47 +1,87 @@
|
||||
from typing import List
|
||||
from abc import abstractmethod
|
||||
from typing import List, Optional
|
||||
|
||||
from theflow import Node, Param
|
||||
|
||||
from ..base import BaseComponent
|
||||
from ..documents.base import Document
|
||||
from ..docstores import BaseDocumentStore
|
||||
from ..documents.base import Document, RetrievedDocument
|
||||
from ..embeddings import BaseEmbeddings
|
||||
from ..vectorstores import BaseVectorStore
|
||||
|
||||
|
||||
class RetrieveDocumentFromVectorStorePipeline(BaseComponent):
|
||||
class BaseRetrieval(BaseComponent):
|
||||
"""Define the base interface of a retrieval pipeline"""
|
||||
|
||||
@abstractmethod
|
||||
def run_raw(self, text: str, top_k: int = 1) -> List[RetrievedDocument]:
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def run_batch_raw(
|
||||
self, text: List[str], top_k: int = 1
|
||||
) -> List[List[RetrievedDocument]]:
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def run_document(self, text: Document, top_k: int = 1) -> List[RetrievedDocument]:
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def run_batch_document(
|
||||
self, text: List[Document], top_k: int = 1
|
||||
) -> List[List[RetrievedDocument]]:
|
||||
...
|
||||
|
||||
|
||||
class RetrieveDocumentFromVectorStorePipeline(BaseRetrieval):
|
||||
"""Retrieve list of documents from vector store"""
|
||||
|
||||
vector_store: Param[BaseVectorStore] = Param()
|
||||
doc_store: Optional[BaseDocumentStore] = None
|
||||
embedding: Node[BaseEmbeddings] = Node()
|
||||
# TODO: populate to document store as well when it's finished
|
||||
# TODO: refer to llama_index's storage as well
|
||||
|
||||
def run_raw(self, text: str) -> List[str]:
|
||||
emb = self.embedding(text)
|
||||
return self.vector_store.query(embedding=emb)[2]
|
||||
def run_raw(self, text: str, top_k: int = 1) -> List[RetrievedDocument]:
|
||||
return self.run_batch_raw([text], top_k=top_k)[0]
|
||||
|
||||
def run_batch_raw(
|
||||
self, text: List[str], top_k: int = 1
|
||||
) -> List[List[RetrievedDocument]]:
|
||||
if self.doc_store is None:
|
||||
raise ValueError(
|
||||
"doc_store is not provided. Please provide a doc_store to "
|
||||
"retrieve the documents"
|
||||
)
|
||||
|
||||
def run_batch_raw(self, text: List[str]) -> List[List[str]]:
|
||||
result = []
|
||||
for each_text in text:
|
||||
emb = self.embedding(each_text)
|
||||
result.append(self.vector_store.query(embedding=emb)[2])
|
||||
_, scores, ids = self.vector_store.query(embedding=emb, top_k=top_k)
|
||||
docs = self.doc_store.get(ids)
|
||||
each_result = [
|
||||
RetrievedDocument(**doc.to_dict(), score=score)
|
||||
for doc, score in zip(docs, scores)
|
||||
]
|
||||
result.append(each_result)
|
||||
return result
|
||||
|
||||
def run_document(self, text: Document) -> List[str]:
|
||||
return self.run_raw(text.text)
|
||||
def run_document(self, text: Document, top_k: int = 1) -> List[RetrievedDocument]:
|
||||
return self.run_raw(text.text, top_k)
|
||||
|
||||
def run_batch_document(self, text: List[Document]) -> List[List[str]]:
|
||||
input_text = [each.text for each in text]
|
||||
return self.run_batch_raw(input_text)
|
||||
def run_batch_document(
|
||||
self, text: List[Document], top_k: int = 1
|
||||
) -> List[List[RetrievedDocument]]:
|
||||
return self.run_batch_raw(text=[t.text for t in text], top_k=top_k)
|
||||
|
||||
def is_document(self, text) -> bool:
|
||||
def is_document(self, text, *args, **kwargs) -> bool:
|
||||
if isinstance(text, Document):
|
||||
return True
|
||||
elif isinstance(text, List) and isinstance(text[0], Document):
|
||||
return True
|
||||
return False
|
||||
|
||||
def is_batch(self, text) -> bool:
|
||||
def is_batch(self, text, *args, **kwargs) -> bool:
|
||||
if isinstance(text, list):
|
||||
return True
|
||||
return False
|
||||
|
@@ -144,8 +144,8 @@ class LlamaIndexVectorStore(BaseVectorStore):
|
||||
query_embedding=embedding,
|
||||
similarity_top_k=top_k,
|
||||
node_ids=ids,
|
||||
**kwargs,
|
||||
),
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
embeddings = []
|
||||
|
Reference in New Issue
Block a user