Create user management functionality (#152)

* Create user management page
* Remove old user creating UI
* Add username validation; admin user auto-creation
* Provide docs on user management
* Bump version
This commit is contained in:
Duc Nguyen (john) 2024-03-07 14:19:37 +07:00 committed by GitHub
parent 8a90fcfc99
commit 9725d60791
13 changed files with 469 additions and 114 deletions

View File

@ -0,0 +1,14 @@
`ktem` provides user management as an extension. To enable user management, in
your `flowsettings.py`, set the following variables:
- `KH_FEATURE_USER_MANAGEMENT`: True to enable.
- `KH_FEATURE_USER_MANAGEMENT_ADMIN`: the admin username. This user will be
created when the app 1st start.
- `KH_FEATURE_USER_MANAGEMENT_PASSWORD`: the admin password. This value
accompanies the admin username.
Once enabled, you have access to the following features:
- User login/logout (located in Settings Tab)
- User changing password (located in Settings Tab)
- Create / List / Edit / Delete user (located in Admin > User Management Tab)

View File

@ -12,6 +12,13 @@ user_cache_dir.mkdir(parents=True, exist_ok=True)
COHERE_API_KEY = config("COHERE_API_KEY", default="") COHERE_API_KEY = config("COHERE_API_KEY", default="")
KH_MODE = "dev" KH_MODE = "dev"
KH_FEATURE_USER_MANAGEMENT = True
KH_FEATURE_USER_MANAGEMENT_ADMIN = str(
config("KH_FEATURE_USER_MANAGEMENT_ADMIN", default="admin")
)
KH_FEATURE_USER_MANAGEMENT_PASSWORD = str(
config("KH_FEATURE_USER_MANAGEMENT_PASSWORD", default="XsdMbe8zKP8KdeE@")
)
KH_ENABLE_ALEMBIC = False KH_ENABLE_ALEMBIC = False
KH_DATABASE = f"sqlite:///{user_cache_dir / 'sql.db'}" KH_DATABASE = f"sqlite:///{user_cache_dir / 'sql.db'}"
KH_DOCSTORE = { KH_DOCSTORE = {

View File

@ -32,6 +32,7 @@ class BaseApp:
def __init__(self): def __init__(self):
self.dev_mode = getattr(settings, "KH_MODE", "") == "dev" self.dev_mode = getattr(settings, "KH_MODE", "") == "dev"
self.f_user_management = getattr(settings, "KH_FEATURE_USER_MANAGEMENT", False)
self._theme = gr.themes.Base( self._theme = gr.themes.Base(
font=("ui-sans-serif", "system-ui", "sans-serif"), font=("ui-sans-serif", "system-ui", "sans-serif"),
font_mono=("ui-monospace", "Consolas", "monospace"), font_mono=("ui-monospace", "Consolas", "monospace"),
@ -60,7 +61,7 @@ class BaseApp:
self.default_settings.index.finalize() self.default_settings.index.finalize()
self.settings_state = gr.State(self.default_settings.flatten()) self.settings_state = gr.State(self.default_settings.flatten())
self.user_id = gr.State(1 if self.dev_mode else None) self.user_id = gr.State(1 if not self.f_user_management else None)
def initialize_indices(self): def initialize_indices(self):
"""Create the index manager, start indices, and register to app settings""" """Create the index manager, start indices, and register to app settings"""

View File

@ -28,7 +28,7 @@ footer {
border: none !important; border: none !important;
} }
#chat-tab, #settings-tab, #help-tab { #chat-tab, #settings-tab, #help-tab, #admin-tab {
border: none !important; border: none !important;
} }

View File

@ -48,7 +48,9 @@ class BaseUser(SQLModel):
id: Optional[int] = Field(default=None, primary_key=True) id: Optional[int] = Field(default=None, primary_key=True)
username: str = Field(unique=True) username: str = Field(unique=True)
username_lower: str = Field(unique=True)
password: str password: str
admin: bool = Field(default=False)
class BaseSettings(SQLModel): class BaseSettings(SQLModel):

View File

@ -405,12 +405,6 @@ class FileIndexPage(BasePage):
name=list_files["name"][ev.index[0]] name=list_files["name"][ev.index[0]]
) )
def delete(self, file_id):
pass
def cancel_delete(self):
pass
class FileSelector(BasePage): class FileSelector(BasePage):
"""File selector UI in the Chat page""" """File selector UI in the Chat page"""

View File

@ -1,5 +1,6 @@
import gradio as gr import gradio as gr
from ktem.app import BaseApp from ktem.app import BaseApp
from ktem.pages.admin import AdminPage
from ktem.pages.chat import ChatPage from ktem.pages.chat import ChatPage
from ktem.pages.help import HelpPage from ktem.pages.help import HelpPage
from ktem.pages.settings import SettingsPage from ktem.pages.settings import SettingsPage
@ -33,6 +34,9 @@ class App(BaseApp):
page = index.get_index_page_ui() page = index.get_index_page_ui()
setattr(self, f"_index_{index.id}", page) setattr(self, f"_index_{index.id}", page)
with gr.Tab("Admin", elem_id="admin-tab"):
self.admin_page = AdminPage(self)
with gr.Tab("Settings", elem_id="settings-tab"): with gr.Tab("Settings", elem_id="settings-tab"):
self.settings_page = SettingsPage(self) self.settings_page = SettingsPage(self)

View File

@ -0,0 +1,48 @@
import gradio as gr
from ktem.app import BasePage
from ktem.db.models import User, engine
from sqlmodel import Session, select
from .user import UserManagement
class AdminPage(BasePage):
def __init__(self, app):
self._app = app
self.on_building_ui()
def on_building_ui(self):
if self._app.f_user_management:
with gr.Tab("User Management", visible=False) as self.user_management_tab:
self.user_management = UserManagement(self._app)
def on_subscribe_public_events(self):
if self._app.f_user_management:
self._app.subscribe_event(
name="onSignIn",
definition={
"fn": self.toggle_user_management,
"inputs": [self._app.user_id],
"outputs": [self.user_management_tab],
"show_progress": "hidden",
},
)
self._app.subscribe_event(
name="onSignOut",
definition={
"fn": self.toggle_user_management,
"inputs": [self._app.user_id],
"outputs": [self.user_management_tab],
"show_progress": "hidden",
},
)
def toggle_user_management(self, user_id):
"""Show/hide the user management, depending on the user's role"""
with Session(engine) as session:
user = session.exec(select(User).where(User.id == user_id)).first()
if user and user.admin:
return gr.update(visible=True)
return gr.update(visible=False)

View File

@ -0,0 +1,377 @@
import hashlib
import gradio as gr
import pandas as pd
from ktem.app import BasePage
from ktem.db.models import User, engine
from sqlmodel import Session, select
from theflow.settings import settings as flowsettings
USERNAME_RULE = """**Username rule:**
- Username is case-insensitive
- Username must be at least 3 characters long
- Username must be at most 32 characters long
- Username must contain only alphanumeric characters and underscores
"""
PASSWORD_RULE = """**Password rule:**
- Password must be at least 8 characters long
- Password must contain at least one uppercase letter
- Password must contain at least one lowercase letter
- Password must contain at least one digit
- Password must contain at least one special character from the following:
^ $ * . [ ] { } ( ) ? - " ! @ # % & / \\ , > < ' : ; | _ ~ + =
"""
def validate_username(usn):
"""Validate that whether username is valid
Args:
usn (str): Username
"""
errors = []
if len(usn) < 3:
errors.append("Username must be at least 3 characters long")
if len(usn) > 32:
errors.append("Username must be at most 32 characters long")
if not usn.strip("_").isalnum():
errors.append(
"Username must contain only alphanumeric characters and underscores"
)
return "; ".join(errors)
def validate_password(pwd, pwd_cnf):
"""Validate that whether password is valid
- Password must be at least 8 characters long
- Password must contain at least one uppercase letter
- Password must contain at least one lowercase letter
- Password must contain at least one digit
- Password must contain at least one special character from the following:
^ $ * . [ ] { } ( ) ? - " ! @ # % & / \\ , > < ' : ; | _ ~ + =
Args:
pwd (str): Password
pwd_cnf (str): Confirm password
Returns:
str: Error message if password is not valid
"""
errors = []
if pwd != pwd_cnf:
errors.append("Password does not match")
if len(pwd) < 8:
errors.append("Password must be at least 8 characters long")
if not any(c.isupper() for c in pwd):
errors.append("Password must contain at least one uppercase letter")
if not any(c.islower() for c in pwd):
errors.append("Password must contain at least one lowercase letter")
if not any(c.isdigit() for c in pwd):
errors.append("Password must contain at least one digit")
special_chars = "^$*.[]{}()?-\"!@#%&/\\,><':;|_~+="
if not any(c in special_chars for c in pwd):
errors.append(
"Password must contain at least one special character from the "
f"following: {special_chars}"
)
if errors:
return "; ".join(errors)
return ""
class UserManagement(BasePage):
def __init__(self, app):
self._app = app
self.selected_panel_false = "Selected user: (please select above)"
self.selected_panel_true = "Selected user: {name}"
self.on_building_ui()
if hasattr(flowsettings, "KH_FEATURE_USER_MANAGEMENT_ADMIN") and hasattr(
flowsettings, "KH_FEATURE_USER_MANAGEMENT_PASSWORD"
):
usn = flowsettings.KH_FEATURE_USER_MANAGEMENT_ADMIN
pwd = flowsettings.KH_FEATURE_USER_MANAGEMENT_PASSWORD
with Session(engine) as session:
statement = select(User).where(User.username_lower == usn.lower())
result = session.exec(statement).all()
if result:
print(f'User "{usn}" already exists')
else:
hashed_password = hashlib.sha256(pwd.encode()).hexdigest()
user = User(
username=usn,
username_lower=usn.lower(),
password=hashed_password,
admin=True,
)
session.add(user)
session.commit()
gr.Info(f'User "{usn}" created successfully')
def on_building_ui(self):
with gr.Accordion(label="Create user", open=False):
self.usn_new = gr.Textbox(label="Username", interactive=True)
self.pwd_new = gr.Textbox(
label="Password", type="password", interactive=True
)
self.pwd_cnf_new = gr.Textbox(
label="Confirm password", type="password", interactive=True
)
with gr.Row():
gr.Markdown(USERNAME_RULE)
gr.Markdown(PASSWORD_RULE)
self.btn_new = gr.Button("Create user")
gr.Markdown("## User list")
self.btn_list_user = gr.Button("Refresh user list")
self.state_user_list = gr.State(value=None)
self.user_list = gr.DataFrame(
headers=["id", "name", "admin"],
interactive=False,
)
with gr.Row():
self.selected_user_id = gr.State(value=None)
self.selected_panel = gr.Markdown(self.selected_panel_false)
self.deselect_button = gr.Button("Deselect", visible=False)
with gr.Group():
self.btn_delete = gr.Button("Delete user")
with gr.Row():
self.btn_delete_yes = gr.Button("Confirm", visible=False)
self.btn_delete_no = gr.Button("Cancel", visible=False)
gr.Markdown("## User details")
self.usn_edit = gr.Textbox(label="Username")
self.pwd_edit = gr.Textbox(label="Password", type="password")
self.pwd_cnf_edit = gr.Textbox(label="Confirm password", type="password")
self.admin_edit = gr.Checkbox(label="Admin")
self.btn_edit_save = gr.Button("Save")
def on_register_events(self):
self.btn_new.click(
self.create_user,
inputs=[self.usn_new, self.pwd_new, self.pwd_cnf_new],
outputs=None,
)
self.btn_list_user.click(
self.list_users, inputs=None, outputs=[self.state_user_list, self.user_list]
)
self.user_list.select(
self.select_user,
inputs=self.user_list,
outputs=[self.selected_user_id, self.selected_panel],
show_progress="hidden",
)
self.selected_panel.change(
self.on_selected_user_change,
inputs=[self.selected_user_id],
outputs=[
self.deselect_button,
# delete section
self.btn_delete,
self.btn_delete_yes,
self.btn_delete_no,
# edit section
self.usn_edit,
self.pwd_edit,
self.pwd_cnf_edit,
self.admin_edit,
],
show_progress="hidden",
)
self.deselect_button.click(
lambda: (None, self.selected_panel_false),
inputs=None,
outputs=[self.selected_user_id, self.selected_panel],
show_progress="hidden",
)
self.btn_delete.click(
self.on_btn_delete_click,
inputs=[self.selected_user_id],
outputs=[self.btn_delete, self.btn_delete_yes, self.btn_delete_no],
show_progress="hidden",
)
self.btn_delete_yes.click(
self.delete_user,
inputs=[self.selected_user_id],
outputs=[self.selected_user_id, self.selected_panel],
show_progress="hidden",
)
self.btn_delete_no.click(
lambda: (
gr.update(visible=True),
gr.update(visible=False),
gr.update(visible=False),
),
inputs=None,
outputs=[self.btn_delete, self.btn_delete_yes, self.btn_delete_no],
show_progress="hidden",
)
self.btn_edit_save.click(
self.save_user,
inputs=[
self.selected_user_id,
self.usn_edit,
self.pwd_edit,
self.pwd_cnf_edit,
self.admin_edit,
],
outputs=None,
show_progress="hidden",
)
def create_user(self, usn, pwd, pwd_cnf):
errors = validate_username(usn)
if errors:
gr.Warning(errors)
return
errors = validate_password(pwd, pwd_cnf)
print(errors)
if errors:
gr.Warning(errors)
return
with Session(engine) as session:
statement = select(User).where(User.username_lower == usn.lower())
result = session.exec(statement).all()
if result:
gr.Warning(f'Username "{usn}" already exists')
return
hashed_password = hashlib.sha256(pwd.encode()).hexdigest()
user = User(
username=usn, username_lower=usn.lower(), password=hashed_password
)
session.add(user)
session.commit()
gr.Info(f'User "{usn}" created successfully')
def list_users(self):
with Session(engine) as session:
statement = select(User)
results = [
{"id": user.id, "username": user.username, "admin": user.admin}
for user in session.exec(statement).all()
]
if results:
user_list = pd.DataFrame.from_records(results)
else:
user_list = pd.DataFrame.from_records(
[{"id": "-", "username": "-", "admin": "-"}]
)
return results, user_list
def select_user(self, user_list, ev: gr.SelectData):
if ev.value == "-" and ev.index[0] == 0:
gr.Info("No user is loaded. Please refresh the user list")
return None, self.selected_panel_false
if not ev.selected:
return None, self.selected_panel_false
return user_list["id"][ev.index[0]], self.selected_panel_true.format(
name=user_list["username"][ev.index[0]]
)
def on_selected_user_change(self, selected_user_id):
if selected_user_id is None:
deselect_button = gr.update(visible=False)
btn_delete = gr.update(visible=True)
btn_delete_yes = gr.update(visible=False)
btn_delete_no = gr.update(visible=False)
usn_edit = gr.update(value="")
pwd_edit = gr.update(value="")
pwd_cnf_edit = gr.update(value="")
admin_edit = gr.update(value=False)
else:
deselect_button = gr.update(visible=True)
btn_delete = gr.update(visible=True)
btn_delete_yes = gr.update(visible=False)
btn_delete_no = gr.update(visible=False)
with Session(engine) as session:
statement = select(User).where(User.id == int(selected_user_id))
user = session.exec(statement).one()
usn_edit = gr.update(value=user.username)
pwd_edit = gr.update(value="")
pwd_cnf_edit = gr.update(value="")
admin_edit = gr.update(value=user.admin)
return (
deselect_button,
btn_delete,
btn_delete_yes,
btn_delete_no,
usn_edit,
pwd_edit,
pwd_cnf_edit,
admin_edit,
)
def on_btn_delete_click(self, selected_user_id):
if selected_user_id is None:
gr.Warning("No user is selected")
btn_delete = gr.update(visible=True)
btn_delete_yes = gr.update(visible=False)
btn_delete_no = gr.update(visible=False)
return
btn_delete = gr.update(visible=False)
btn_delete_yes = gr.update(visible=True)
btn_delete_no = gr.update(visible=True)
return btn_delete, btn_delete_yes, btn_delete_no
def save_user(self, selected_user_id, usn, pwd, pwd_cnf, admin):
if usn:
errors = validate_username(usn)
if errors:
gr.Warning(errors)
return
if pwd:
errors = validate_password(pwd, pwd_cnf)
if errors:
gr.Warning(errors)
return
with Session(engine) as session:
statement = select(User).where(User.id == int(selected_user_id))
user = session.exec(statement).one()
user.username = usn
user.username_lower = usn.lower()
user.admin = admin
if pwd:
user.password = hashlib.sha256(pwd.encode()).hexdigest()
session.commit()
gr.Info(f'User "{usn}" updated successfully')
def delete_user(self, selected_user_id):
with Session(engine) as session:
statement = select(User).where(User.id == int(selected_user_id))
user = session.exec(statement).one()
session.delete(user)
session.commit()
gr.Info(f'User "{user.username}" deleted successfully')
return None, self.selected_panel_false

View File

@ -71,16 +71,6 @@ class ConversationControl(BasePage):
}, },
) )
self._app.subscribe_event(
name="onCreateUser",
definition={
"fn": self.reload_conv,
"inputs": [self._app.user_id],
"outputs": [self.conversation],
"show_progress": "hidden",
},
)
def on_register_events(self): def on_register_events(self):
self.conversation_new_btn.click( self.conversation_new_btn.click(
self.new_conv, self.new_conv,

View File

@ -49,7 +49,7 @@ class SettingsPage(BasePage):
name of the setting in the `app.default_settings` name of the setting in the `app.default_settings`
""" """
public_events = ["onSignIn", "onSignOut", "onCreateUser"] public_events = ["onSignIn", "onSignOut"]
def __init__(self, app): def __init__(self, app):
"""Initiate the page and render the UI""" """Initiate the page and render the UI"""
@ -68,7 +68,7 @@ class SettingsPage(BasePage):
def on_building_ui(self): def on_building_ui(self):
self.setting_save_btn = gr.Button("Save settings") self.setting_save_btn = gr.Button("Save settings")
if not self._app.dev_mode: if self._app.f_user_management:
with gr.Tab("User settings"): with gr.Tab("User settings"):
self.user_tab() self.user_tab()
with gr.Tab("General application settings"): with gr.Tab("General application settings"):
@ -93,7 +93,7 @@ class SettingsPage(BasePage):
outputs=list(self._reasoning_mode.values()), outputs=list(self._reasoning_mode.values()),
show_progress="hidden", show_progress="hidden",
) )
if not self._app.dev_mode: if self._app.f_user_management:
self.password_change_btn.click( self.password_change_btn.click(
self.change_password, self.change_password,
inputs=[ inputs=[
@ -137,31 +137,6 @@ class SettingsPage(BasePage):
for event in self._app.get_event("onSignIn"): for event in self._app.get_event("onSignIn"):
onSignInSubmit = onSignInSubmit.then(**event) onSignInSubmit = onSignInSubmit.then(**event)
onCreateUserClick = self.create_btn.click(
self.create_user,
inputs=[
self.username_new,
self.password_new,
self.password_new_confirm,
],
outputs=[
self._user_id,
self.username_new,
self.password_new,
self.password_new_confirm,
]
+ self.signed_in_state()
+ [self.user_out_state],
show_progress="hidden",
).then(
self.load_setting,
inputs=self._user_id,
outputs=[self._settings_state] + self.components(),
show_progress="hidden",
)
for event in self._app.get_event("onCreateUser"):
onCreateUserClick = onCreateUserClick.then(**event)
onSignOutClick = self.signout.click( onSignOutClick = self.signout.click(
self.sign_out, self.sign_out,
inputs=None, inputs=None,
@ -179,8 +154,7 @@ class SettingsPage(BasePage):
onSignOutClick = onSignOutClick.then(**event) onSignOutClick = onSignOutClick.then(**event)
def user_tab(self): def user_tab(self):
with gr.Row() as self.user_out_state: with gr.Column() as self.user_out_state:
with gr.Column():
gr.Markdown("Sign in") gr.Markdown("Sign in")
self.username = gr.Textbox(label="Username", interactive=True) self.username = gr.Textbox(label="Username", interactive=True)
self.password = gr.Textbox( self.password = gr.Textbox(
@ -188,17 +162,6 @@ class SettingsPage(BasePage):
) )
self.signin = gr.Button("Login") self.signin = gr.Button("Login")
with gr.Column():
gr.Markdown("Create new account")
self.username_new = gr.Textbox(label="Username", interactive=True)
self.password_new = gr.Textbox(
label="Password", type="password", interactive=True
)
self.password_new_confirm = gr.Textbox(
label="Confirm password", type="password", interactive=True
)
self.create_btn = gr.Button("Create account")
# user management # user management
self.current_name = gr.Markdown("Current user: ___", visible=False) self.current_name = gr.Markdown("Current user: ___", visible=False)
self.signout = gr.Button("Logout", visible=False) self.signout = gr.Button("Logout", visible=False)
@ -213,17 +176,6 @@ class SettingsPage(BasePage):
"Change password", interactive=True, visible=False "Change password", interactive=True, visible=False
) )
def signed_out_state(self):
return [
self.username,
self.password,
self.signin,
self.username_new,
self.password_new,
self.password_new_confirm,
self.create_btn,
]
def signed_in_state(self): def signed_in_state(self):
return [ return [
self.current_name, # always the first one self.current_name, # always the first one
@ -238,7 +190,7 @@ class SettingsPage(BasePage):
user_id, clear_username, clear_password = None, username, password user_id, clear_username, clear_password = None, username, password
with Session(engine) as session: with Session(engine) as session:
statement = select(User).where( statement = select(User).where(
User.username == username, User.username_lower == username.lower(),
User.password == hashed_password, User.password == hashed_password,
) )
result = session.exec(statement).all() result = session.exec(statement).all()
@ -263,42 +215,6 @@ class SettingsPage(BasePage):
return output return output
def create_user(self, username, password, password_confirm):
user_id, usn, pwd, pwdc = None, username, password, password_confirm
if password != password_confirm:
gr.Warning("Password does not match")
else:
with Session(engine) as session:
statement = select(User).where(
User.username == username,
)
result = session.exec(statement).all()
if result:
gr.Warning(f'Username "{username}" already exists')
else:
hashed_password = hashlib.sha256(password.encode()).hexdigest()
user = User(username=username, password=hashed_password)
session.add(user)
session.commit()
user_id = user.id
usn, pwd, pwdc = "", "", ""
print(user_id)
output: list = [user_id, usn, pwd, pwdc]
if user_id is not None:
output.append(gr.update(visible=True, value=f"Current user: {username}"))
output += [
gr.update(visible=True) for _ in range(len(self.signed_in_state()) - 1)
]
output.append(gr.update(visible=False))
else:
output += [
gr.update(visible=False) for _ in range(len(self.signed_in_state()))
]
output.append(gr.update(visible=True))
return output
def sign_out(self): def sign_out(self):
output = [None] output = [None]
output += [gr.update(visible=False) for _ in range(len(self.signed_in_state()))] output += [gr.update(visible=False) for _ in range(len(self.signed_in_state()))]

View File

@ -9,7 +9,7 @@ packages.find.exclude = ["tests*", "env*"]
[project] [project]
name = "ktem" name = "ktem"
version = "0.1.0" version = "0.2.0"
requires-python = ">= 3.10" requires-python = ">= 3.10"
description = "RAG-based Question and Answering Application" description = "RAG-based Question and Answering Application"
dependencies = [ dependencies = [

View File

@ -12,6 +12,8 @@ nav:
- Features: pages/app/features.md - Features: pages/app/features.md
- Index: - Index:
- File index: pages/app/index/file.md - File index: pages/app/index/file.md
- Extension:
- User management: pages/app/ext/user-management.md
- Customize flow logic: pages/app/customize-flows.md - Customize flow logic: pages/app/customize-flows.md
- Customize UI: pages/app/customize-ui.md - Customize UI: pages/app/customize-ui.md
- Functional description: pages/app/functional-description.md - Functional description: pages/app/functional-description.md