"}``
+ as its first message. The proxy validates the JWT, then connects to the
+ upstream terminal server and authenticates with the server's API key.
+ """
+ await ws.accept()
+
+ result = await _resolve_authenticated_connection(ws, server_id)
+ if result is None:
+ return
+ user, connection = result
+
+ base_url = (connection.get('url') or '').rstrip('/')
+ if not base_url:
+ await ws.close(code=4003, reason='Terminal server URL not configured')
+ return
+
+ # Build upstream WebSocket URL (no token in URL)
+ ws_base = base_url.replace('https://', 'wss://').replace('http://', 'ws://')
+
+ # Route through orchestrator policy endpoint if policy_id is set
+ policy_id = connection.get('policy_id')
+ upstream_params = {}
+ # For orchestrator-backed servers, pass user_id
+ upstream_params['user_id'] = user.id
+
+ import urllib.parse
+
+ if policy_id:
+ upstream_url = f'{ws_base}/p/{policy_id}/api/terminals/{session_id}'
+ else:
+ upstream_url = f'{ws_base}/api/terminals/{session_id}'
+ if upstream_params:
+ upstream_url += f'?{urllib.parse.urlencode(upstream_params)}'
+
+ session = aiohttp.ClientSession()
+ try:
+ async with session.ws_connect(upstream_url, ssl=AIOHTTP_CLIENT_SESSION_SSL) as upstream:
+ import asyncio
+ import json as _json
+
+ # First-message auth to upstream terminal server
+ auth_type = connection.get('auth_type', 'bearer')
+ if auth_type == 'bearer':
+ key = connection.get('key', '')
+ await upstream.send_str(_json.dumps({'type': 'auth', 'token': key}))
+
+ async def _client_to_upstream():
+ """Forward client → upstream."""
+ try:
+ while True:
+ msg = await ws.receive()
+ if msg['type'] == 'websocket.disconnect':
+ break
+ elif 'bytes' in msg and msg['bytes']:
+ await upstream.send_bytes(msg['bytes'])
+ elif 'text' in msg and msg['text']:
+ await upstream.send_str(msg['text'])
+ except Exception:
+ pass
+
+ async def _upstream_to_client():
+ """Forward upstream → client."""
+ try:
+ async for msg in upstream:
+ if msg.type == aiohttp.WSMsgType.BINARY:
+ await ws.send_bytes(msg.data)
+ elif msg.type == aiohttp.WSMsgType.TEXT:
+ await ws.send_text(msg.data)
+ elif msg.type in (
+ aiohttp.WSMsgType.CLOSE,
+ aiohttp.WSMsgType.ERROR,
+ ):
+ break
+ except Exception:
+ pass
+
+ await asyncio.gather(
+ _client_to_upstream(),
+ _upstream_to_client(),
+ return_exceptions=True,
+ )
+ except Exception as e:
+ log.exception('Terminal WebSocket proxy error: %s', e)
+ finally:
+ await session.close()
+ try:
+ await ws.close()
+ except Exception:
+ pass
diff --git a/backend/open_webui/routers/tools.py b/backend/open_webui/routers/tools.py
new file mode 100644
index 0000000000000000000000000000000000000000..04d845c3deae7e1a5d3f257c03554891ea8515c1
--- /dev/null
+++ b/backend/open_webui/routers/tools.py
@@ -0,0 +1,919 @@
+import logging
+from pathlib import Path
+from typing import Optional
+import time
+import re
+import aiohttp
+from open_webui.env import AIOHTTP_CLIENT_SESSION_SSL, AIOHTTP_CLIENT_TIMEOUT
+from open_webui.models.groups import Groups
+from pydantic import BaseModel, HttpUrl
+from fastapi import APIRouter, Depends, HTTPException, Request, status
+from sqlalchemy.ext.asyncio import AsyncSession
+from open_webui.internal.db import get_async_session
+
+
+from open_webui.models.oauth_sessions import OAuthSessions
+from open_webui.models.tools import (
+ ToolForm,
+ ToolModel,
+ ToolResponse,
+ ToolUserResponse,
+ ToolAccessResponse,
+ Tools,
+)
+from open_webui.models.access_grants import AccessGrants
+from open_webui.utils.plugin import (
+ load_tool_module_by_id,
+ replace_imports,
+ get_tool_module_from_cache,
+ resolve_valves_schema_options,
+)
+from open_webui.utils.tools import get_tool_specs
+from open_webui.utils.auth import get_admin_user, get_verified_user
+from open_webui.utils.access_control import (
+ has_permission,
+ has_access,
+ filter_allowed_access_grants,
+)
+from open_webui.utils.tools import get_tool_servers
+
+from open_webui.config import CACHE_DIR, BYPASS_ADMIN_ACCESS_CONTROL
+from open_webui.constants import ERROR_MESSAGES
+
+log = logging.getLogger(__name__)
+
+
+router = APIRouter()
+
+
+async def get_tool_module(request, tool_id, load_from_db=True):
+ """
+ Get the tool module by its ID.
+ """
+ tool_module, _ = await get_tool_module_from_cache(request, tool_id, load_from_db)
+ return tool_module
+
+
+############################
+# GetTools
+# The danger is not in having tools, but in reaching
+# for the wrong one. Let the choice here be deliberate.
+############################
+
+
+@router.get('/', response_model=list[ToolUserResponse])
+async def get_tools(
+ request: Request,
+ user=Depends(get_verified_user),
+ db: AsyncSession = Depends(get_async_session),
+):
+ tools = []
+
+ # Local Tools
+ for tool in await Tools.get_tools(defer_content=True, db=db):
+ tool_module = request.app.state.TOOLS.get(tool.id) if hasattr(request.app.state, 'TOOLS') else None
+ tools.append(
+ ToolUserResponse(
+ **{
+ **tool.model_dump(),
+ 'has_user_valves': (hasattr(tool_module, 'UserValves') if tool_module else False),
+ }
+ )
+ )
+
+ # OpenAPI Tool Servers
+ server_access_grants = {}
+ for server in await get_tool_servers(request):
+ server_idx = server.get('idx', 0)
+ connections = request.app.state.config.TOOL_SERVER_CONNECTIONS
+ if server_idx >= len(connections):
+ log.warning(
+ f'Tool server index {server_idx} out of range '
+ f'(have {len(connections)} connections), skipping server {server.get("id")}'
+ )
+ continue
+ connection = connections[server_idx]
+ server_config = connection.get('config', {})
+
+ server_id = f'server:{server.get("id")}'
+ server_access_grants[server_id] = server_config.get('access_grants', [])
+
+ tools.append(
+ ToolUserResponse(
+ **{
+ 'id': server_id,
+ 'user_id': server_id,
+ 'name': server.get('openapi', {}).get('info', {}).get('title', 'Tool Server'),
+ 'meta': {
+ 'description': server.get('openapi', {}).get('info', {}).get('description', ''),
+ },
+ 'updated_at': int(time.time()),
+ 'created_at': int(time.time()),
+ }
+ )
+ )
+
+ # MCP Tool Servers
+ for server in request.app.state.config.TOOL_SERVER_CONNECTIONS:
+ if server.get('type', 'openapi') == 'mcp' and server.get('config', {}).get('enable'):
+ server_id = server.get('info', {}).get('id')
+ auth_type = server.get('auth_type', 'none')
+
+ session_token = None
+ if auth_type in ('oauth_2.1', 'oauth_2.1_static'):
+ splits = server_id.split(':')
+ server_id = splits[-1] if len(splits) > 1 else server_id
+
+ session_token = await request.app.state.oauth_client_manager.get_oauth_token(
+ user.id, f'mcp:{server_id}'
+ )
+
+ server_config = server.get('config', {})
+
+ tool_id = f'server:mcp:{server.get("info", {}).get("id")}'
+ server_access_grants[tool_id] = server_config.get('access_grants', [])
+
+ tools.append(
+ ToolUserResponse(
+ **{
+ 'id': tool_id,
+ 'user_id': tool_id,
+ 'name': server.get('info', {}).get('name', 'MCP Tool Server'),
+ 'meta': {
+ 'description': server.get('info', {}).get('description', ''),
+ },
+ 'updated_at': int(time.time()),
+ 'created_at': int(time.time()),
+ **(
+ {
+ 'authenticated': session_token is not None,
+ }
+ if auth_type in ('oauth_2.1', 'oauth_2.1_static')
+ else {}
+ ),
+ }
+ )
+ )
+
+ if user.role == 'admin' and BYPASS_ADMIN_ACCESS_CONTROL:
+ # Admin can see all tools
+ return tools
+ else:
+ user_group_ids = {group.id for group in await Groups.get_groups_by_member_id(user.id, db=db)}
+ filtered_tools = []
+ for tool in tools:
+ if tool.user_id == user.id:
+ filtered_tools.append(tool)
+ elif str(tool.id).startswith('server:'):
+ if await has_access(
+ user.id,
+ 'read',
+ server_access_grants.get(str(tool.id), []),
+ user_group_ids,
+ db=db,
+ ):
+ filtered_tools.append(tool)
+ elif await AccessGrants.has_access(
+ user_id=user.id,
+ resource_type='tool',
+ resource_id=tool.id,
+ permission='read',
+ user_group_ids=user_group_ids,
+ db=db,
+ ):
+ filtered_tools.append(tool)
+ return filtered_tools
+
+
+############################
+# GetToolList
+############################
+
+
+@router.get('/list', response_model=list[ToolAccessResponse])
+async def get_tool_list(user=Depends(get_verified_user), db: AsyncSession = Depends(get_async_session)):
+ if user.role == 'admin' and BYPASS_ADMIN_ACCESS_CONTROL:
+ tools = await Tools.get_tools(defer_content=True, db=db)
+ else:
+ tools = await Tools.get_tools_by_user_id(user.id, 'read', defer_content=True, db=db)
+
+ user_group_ids = {group.id for group in await Groups.get_groups_by_member_id(user.id, db=db)}
+
+ result = []
+ for tool in tools:
+ has_write = (
+ (user.role == 'admin' and BYPASS_ADMIN_ACCESS_CONTROL)
+ or user.id == tool.user_id
+ or any(
+ g.permission == 'write'
+ and (
+ (g.principal_type == 'user' and (g.principal_id == user.id or g.principal_id == '*'))
+ or (g.principal_type == 'group' and g.principal_id in user_group_ids)
+ )
+ for g in tool.access_grants
+ )
+ )
+ result.append(
+ ToolAccessResponse(
+ **tool.model_dump(),
+ write_access=has_write,
+ )
+ )
+ return result
+
+
+############################
+# LoadFunctionFromLink
+############################
+
+
+class LoadUrlForm(BaseModel):
+ url: HttpUrl
+
+
+def github_url_to_raw_url(url: str) -> str:
+ # Handle 'tree' (folder) URLs (add main.py at the end)
+ m1 = re.match(r'https://github\.com/([^/]+)/([^/]+)/tree/([^/]+)/(.*)', url)
+ if m1:
+ org, repo, branch, path = m1.groups()
+ return f'https://raw.githubusercontent.com/{org}/{repo}/refs/heads/{branch}/{path.rstrip("/")}/main.py'
+
+ # Handle 'blob' (file) URLs
+ m2 = re.match(r'https://github\.com/([^/]+)/([^/]+)/blob/([^/]+)/(.*)', url)
+ if m2:
+ org, repo, branch, path = m2.groups()
+ return f'https://raw.githubusercontent.com/{org}/{repo}/refs/heads/{branch}/{path}'
+
+ # No match; return as-is
+ return url
+
+
+@router.post('/load/url', response_model=Optional[dict])
+async def load_tool_from_url(request: Request, form_data: LoadUrlForm, user=Depends(get_admin_user)):
+ # NOTE: This is NOT a SSRF vulnerability:
+ # This endpoint is admin-only (see get_admin_user), meant for *trusted* internal use,
+ # and does NOT accept untrusted user input. Access is enforced by authentication.
+
+ url = str(form_data.url)
+ if not url:
+ raise HTTPException(status_code=400, detail='Please enter a valid URL')
+
+ url = github_url_to_raw_url(url)
+ url_parts = url.rstrip('/').split('/')
+
+ file_name = url_parts[-1]
+ tool_name = (
+ file_name[:-3]
+ if (file_name.endswith('.py') and (not file_name.startswith(('main.py', 'index.py', '__init__.py'))))
+ else url_parts[-2]
+ if len(url_parts) > 1
+ else 'function'
+ )
+
+ try:
+ async with aiohttp.ClientSession(
+ trust_env=True, timeout=aiohttp.ClientTimeout(total=AIOHTTP_CLIENT_TIMEOUT)
+ ) as session:
+ async with session.get(
+ url, headers={'Content-Type': 'application/json'}, ssl=AIOHTTP_CLIENT_SESSION_SSL
+ ) as resp:
+ if resp.status != 200:
+ raise HTTPException(status_code=resp.status, detail='Failed to fetch the tool')
+ data = await resp.text()
+ if not data:
+ raise HTTPException(status_code=400, detail='No data received from the URL')
+ return {
+ 'name': tool_name,
+ 'content': data,
+ }
+ except Exception as e:
+ raise HTTPException(status_code=500, detail=ERROR_MESSAGES.DEFAULT(e))
+
+
+############################
+# ExportTools
+############################
+
+
+@router.get('/export', response_model=list[ToolModel])
+async def export_tools(
+ request: Request,
+ user=Depends(get_verified_user),
+ db: AsyncSession = Depends(get_async_session),
+):
+ if user.role != 'admin' and not await has_permission(
+ user.id,
+ 'workspace.tools_export',
+ request.app.state.config.USER_PERMISSIONS,
+ db=db,
+ ):
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail=ERROR_MESSAGES.UNAUTHORIZED,
+ )
+
+ if user.role == 'admin' and BYPASS_ADMIN_ACCESS_CONTROL:
+ return await Tools.get_tools(db=db)
+ else:
+ return await Tools.get_tools_by_user_id(user.id, 'read', db=db)
+
+
+############################
+# CreateNewTools
+############################
+
+
+@router.post('/create', response_model=Optional[ToolResponse])
+async def create_new_tools(
+ request: Request,
+ form_data: ToolForm,
+ user=Depends(get_verified_user),
+ db: AsyncSession = Depends(get_async_session),
+):
+ if user.role != 'admin' and not (
+ await has_permission(user.id, 'workspace.tools', request.app.state.config.USER_PERMISSIONS, db=db)
+ or await has_permission(
+ user.id,
+ 'workspace.tools_import',
+ request.app.state.config.USER_PERMISSIONS,
+ db=db,
+ )
+ ):
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail=ERROR_MESSAGES.UNAUTHORIZED,
+ )
+
+ if not form_data.id.isidentifier():
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail='Only alphanumeric characters and underscores are allowed in the id',
+ )
+
+ form_data.id = form_data.id.lower()
+
+ tools = await Tools.get_tool_by_id(form_data.id, db=db)
+ if tools is None:
+ try:
+ form_data.access_grants = await filter_allowed_access_grants(
+ request.app.state.config.USER_PERMISSIONS,
+ user.id,
+ user.role,
+ form_data.access_grants,
+ 'sharing.public_tools',
+ )
+
+ form_data.content = replace_imports(form_data.content)
+ tool_module, frontmatter = await load_tool_module_by_id(form_data.id, content=form_data.content)
+ form_data.meta.manifest = frontmatter
+
+ TOOLS = request.app.state.TOOLS
+ TOOLS[form_data.id] = tool_module
+
+ specs = get_tool_specs(TOOLS[form_data.id])
+ tools = await Tools.insert_new_tool(user.id, form_data, specs, db=db)
+
+ tool_cache_dir = CACHE_DIR / 'tools' / form_data.id
+ tool_cache_dir.mkdir(parents=True, exist_ok=True)
+
+ if tools:
+ return tools
+ else:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ERROR_MESSAGES.DEFAULT('Error creating tools'),
+ )
+ except Exception as e:
+ log.exception(f'Failed to load the tool by id {form_data.id}: {e}')
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ERROR_MESSAGES.DEFAULT(str(e)),
+ )
+ else:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ERROR_MESSAGES.ID_TAKEN,
+ )
+
+
+############################
+# GetToolsById
+############################
+
+
+@router.get('/id/{id}', response_model=Optional[ToolAccessResponse])
+async def get_tools_by_id(id: str, user=Depends(get_verified_user), db: AsyncSession = Depends(get_async_session)):
+ tools = await Tools.get_tool_by_id(id, db=db)
+
+ if tools:
+ if (
+ user.role == 'admin'
+ or tools.user_id == user.id
+ or await AccessGrants.has_access(
+ user_id=user.id,
+ resource_type='tool',
+ resource_id=tools.id,
+ permission='read',
+ db=db,
+ )
+ ):
+ return ToolAccessResponse(
+ **tools.model_dump(),
+ write_access=(
+ (user.role == 'admin' and BYPASS_ADMIN_ACCESS_CONTROL)
+ or user.id == tools.user_id
+ or await AccessGrants.has_access(
+ user_id=user.id,
+ resource_type='tool',
+ resource_id=tools.id,
+ permission='write',
+ db=db,
+ )
+ ),
+ )
+ else:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
+ )
+ else:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail=ERROR_MESSAGES.NOT_FOUND,
+ )
+
+
+############################
+# UpdateToolsById
+############################
+
+
+@router.post('/id/{id}/update', response_model=Optional[ToolModel])
+async def update_tools_by_id(
+ request: Request,
+ id: str,
+ form_data: ToolForm,
+ user=Depends(get_verified_user),
+ db: AsyncSession = Depends(get_async_session),
+):
+ tools = await Tools.get_tool_by_id(id, db=db)
+ if not tools:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail=ERROR_MESSAGES.NOT_FOUND,
+ )
+
+ # Is the user the original creator, in a group with write access, or an admin
+ if (
+ tools.user_id != user.id
+ and not await AccessGrants.has_access(
+ user_id=user.id,
+ resource_type='tool',
+ resource_id=tools.id,
+ permission='write',
+ db=db,
+ )
+ and user.role != 'admin'
+ ):
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail=ERROR_MESSAGES.UNAUTHORIZED,
+ )
+
+ try:
+ form_data.content = replace_imports(form_data.content)
+ tool_module, frontmatter = await load_tool_module_by_id(id, content=form_data.content)
+ form_data.meta.manifest = frontmatter
+
+ TOOLS = request.app.state.TOOLS
+ TOOLS[id] = tool_module
+
+ specs = get_tool_specs(TOOLS[id])
+
+ form_data.access_grants = await filter_allowed_access_grants(
+ request.app.state.config.USER_PERMISSIONS,
+ user.id,
+ user.role,
+ form_data.access_grants,
+ 'sharing.public_tools',
+ )
+
+ updated = {
+ **form_data.model_dump(exclude={'id'}),
+ 'specs': specs,
+ }
+
+ log.debug(updated)
+ tools = await Tools.update_tool_by_id(id, updated, db=db)
+
+ if tools:
+ return tools
+ else:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ERROR_MESSAGES.DEFAULT('Error updating tools'),
+ )
+
+ except Exception as e:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ERROR_MESSAGES.DEFAULT(str(e)),
+ )
+
+
+############################
+# UpdateToolAccessById
+############################
+
+
+class ToolAccessGrantsForm(BaseModel):
+ access_grants: list[dict]
+
+
+@router.post('/id/{id}/access/update', response_model=Optional[ToolModel])
+async def update_tool_access_by_id(
+ request: Request,
+ id: str,
+ form_data: ToolAccessGrantsForm,
+ user=Depends(get_verified_user),
+ db: AsyncSession = Depends(get_async_session),
+):
+ tools = await Tools.get_tool_by_id(id, db=db)
+ if not tools:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail=ERROR_MESSAGES.NOT_FOUND,
+ )
+
+ if (
+ tools.user_id != user.id
+ and not await AccessGrants.has_access(
+ user_id=user.id,
+ resource_type='tool',
+ resource_id=tools.id,
+ permission='write',
+ db=db,
+ )
+ and user.role != 'admin'
+ ):
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail=ERROR_MESSAGES.UNAUTHORIZED,
+ )
+
+ form_data.access_grants = await filter_allowed_access_grants(
+ request.app.state.config.USER_PERMISSIONS,
+ user.id,
+ user.role,
+ form_data.access_grants,
+ 'sharing.public_tools',
+ )
+
+ await AccessGrants.set_access_grants('tool', id, form_data.access_grants, db=db)
+
+ return await Tools.get_tool_by_id(id, db=db)
+
+
+############################
+# DeleteToolsById
+############################
+
+
+@router.delete('/id/{id}/delete', response_model=bool)
+async def delete_tools_by_id(
+ request: Request,
+ id: str,
+ user=Depends(get_verified_user),
+ db: AsyncSession = Depends(get_async_session),
+):
+ tools = await Tools.get_tool_by_id(id, db=db)
+ if not tools:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail=ERROR_MESSAGES.NOT_FOUND,
+ )
+
+ if (
+ tools.user_id != user.id
+ and not await AccessGrants.has_access(
+ user_id=user.id,
+ resource_type='tool',
+ resource_id=tools.id,
+ permission='write',
+ db=db,
+ )
+ and user.role != 'admin'
+ ):
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail=ERROR_MESSAGES.UNAUTHORIZED,
+ )
+
+ result = await Tools.delete_tool_by_id(id, db=db)
+ if result:
+ TOOLS = request.app.state.TOOLS
+ if id in TOOLS:
+ del TOOLS[id]
+
+ return result
+
+
+############################
+# GetToolValves
+############################
+
+
+@router.get('/id/{id}/valves', response_model=Optional[dict])
+async def get_tools_valves_by_id(
+ id: str, user=Depends(get_verified_user), db: AsyncSession = Depends(get_async_session)
+):
+ tools = await Tools.get_tool_by_id(id, db=db)
+ if not tools:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail=ERROR_MESSAGES.NOT_FOUND,
+ )
+
+ if (
+ tools.user_id != user.id
+ and not await AccessGrants.has_access(
+ user_id=user.id,
+ resource_type='tool',
+ resource_id=tools.id,
+ permission='write',
+ db=db,
+ )
+ and user.role != 'admin'
+ ):
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
+ )
+
+ try:
+ valves = await Tools.get_tool_valves_by_id(id, db=db)
+ return valves
+ except Exception as e:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ERROR_MESSAGES.DEFAULT(str(e)),
+ )
+
+
+############################
+# GetToolValvesSpec
+############################
+
+
+@router.get('/id/{id}/valves/spec', response_model=Optional[dict])
+async def get_tools_valves_spec_by_id(
+ request: Request,
+ id: str,
+ user=Depends(get_verified_user),
+ db: AsyncSession = Depends(get_async_session),
+):
+ tools = await Tools.get_tool_by_id(id, db=db)
+ if not tools:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail=ERROR_MESSAGES.NOT_FOUND,
+ )
+
+ if (
+ tools.user_id != user.id
+ and not await AccessGrants.has_access(
+ user_id=user.id,
+ resource_type='tool',
+ resource_id=tools.id,
+ permission='write',
+ db=db,
+ )
+ and user.role != 'admin'
+ ):
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
+ )
+
+ if id in request.app.state.TOOLS:
+ tools_module = request.app.state.TOOLS[id]
+ else:
+ tools_module, _ = await load_tool_module_by_id(id)
+ request.app.state.TOOLS[id] = tools_module
+
+ if hasattr(tools_module, 'Valves'):
+ Valves = tools_module.Valves
+ schema = Valves.schema()
+ # Resolve dynamic options for select dropdowns
+ schema = resolve_valves_schema_options(Valves, schema, user)
+ return schema
+ return None
+
+
+############################
+# UpdateToolValves
+############################
+
+
+@router.post('/id/{id}/valves/update', response_model=Optional[dict])
+async def update_tools_valves_by_id(
+ request: Request,
+ id: str,
+ form_data: dict,
+ user=Depends(get_verified_user),
+ db: AsyncSession = Depends(get_async_session),
+):
+ tools = await Tools.get_tool_by_id(id, db=db)
+ if not tools:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail=ERROR_MESSAGES.NOT_FOUND,
+ )
+
+ if (
+ tools.user_id != user.id
+ and not await AccessGrants.has_access(
+ user_id=user.id,
+ resource_type='tool',
+ resource_id=tools.id,
+ permission='write',
+ db=db,
+ )
+ and user.role != 'admin'
+ ):
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
+ )
+
+ if id in request.app.state.TOOLS:
+ tools_module = request.app.state.TOOLS[id]
+ else:
+ tools_module, _ = await load_tool_module_by_id(id)
+ request.app.state.TOOLS[id] = tools_module
+
+ if not hasattr(tools_module, 'Valves'):
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail=ERROR_MESSAGES.NOT_FOUND,
+ )
+ Valves = tools_module.Valves
+
+ try:
+ form_data = {k: v for k, v in form_data.items() if v is not None}
+ valves = Valves(**form_data)
+ valves_dict = valves.model_dump(exclude_unset=True)
+ await Tools.update_tool_valves_by_id(id, valves_dict, db=db)
+ return valves_dict
+ except Exception as e:
+ log.exception(f'Failed to update tool valves by id {id}: {e}')
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ERROR_MESSAGES.DEFAULT(str(e)),
+ )
+
+
+############################
+# ToolUserValves
+############################
+
+
+@router.get('/id/{id}/valves/user', response_model=Optional[dict])
+async def get_tools_user_valves_by_id(
+ id: str, user=Depends(get_verified_user), db: AsyncSession = Depends(get_async_session)
+):
+ tools = await Tools.get_tool_by_id(id, db=db)
+ if not tools:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail=ERROR_MESSAGES.NOT_FOUND,
+ )
+
+ if (
+ tools.user_id != user.id
+ and not await AccessGrants.has_access(
+ user_id=user.id,
+ resource_type='tool',
+ resource_id=tools.id,
+ permission='read',
+ db=db,
+ )
+ and user.role != 'admin'
+ ):
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
+ )
+
+ try:
+ user_valves = await Tools.get_user_valves_by_id_and_user_id(id, user.id, db=db)
+ return user_valves
+ except Exception as e:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ERROR_MESSAGES.DEFAULT(str(e)),
+ )
+
+
+@router.get('/id/{id}/valves/user/spec', response_model=Optional[dict])
+async def get_tools_user_valves_spec_by_id(
+ request: Request,
+ id: str,
+ user=Depends(get_verified_user),
+ db: AsyncSession = Depends(get_async_session),
+):
+ tools = await Tools.get_tool_by_id(id, db=db)
+ if not tools:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail=ERROR_MESSAGES.NOT_FOUND,
+ )
+
+ if (
+ tools.user_id != user.id
+ and not await AccessGrants.has_access(
+ user_id=user.id,
+ resource_type='tool',
+ resource_id=tools.id,
+ permission='read',
+ db=db,
+ )
+ and user.role != 'admin'
+ ):
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
+ )
+
+ if id in request.app.state.TOOLS:
+ tools_module = request.app.state.TOOLS[id]
+ else:
+ tools_module, _ = await load_tool_module_by_id(id)
+ request.app.state.TOOLS[id] = tools_module
+
+ if hasattr(tools_module, 'UserValves'):
+ UserValves = tools_module.UserValves
+ schema = UserValves.schema()
+ # Resolve dynamic options for select dropdowns
+ schema = resolve_valves_schema_options(UserValves, schema, user)
+ return schema
+ return None
+
+
+@router.post('/id/{id}/valves/user/update', response_model=Optional[dict])
+async def update_tools_user_valves_by_id(
+ request: Request,
+ id: str,
+ form_data: dict,
+ user=Depends(get_verified_user),
+ db: AsyncSession = Depends(get_async_session),
+):
+ tools = await Tools.get_tool_by_id(id, db=db)
+ if not tools:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail=ERROR_MESSAGES.NOT_FOUND,
+ )
+
+ if (
+ tools.user_id != user.id
+ and not await AccessGrants.has_access(
+ user_id=user.id,
+ resource_type='tool',
+ resource_id=tools.id,
+ permission='read',
+ db=db,
+ )
+ and user.role != 'admin'
+ ):
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
+ )
+
+ if id in request.app.state.TOOLS:
+ tools_module = request.app.state.TOOLS[id]
+ else:
+ tools_module, _ = await load_tool_module_by_id(id)
+ request.app.state.TOOLS[id] = tools_module
+
+ if hasattr(tools_module, 'UserValves'):
+ UserValves = tools_module.UserValves
+
+ try:
+ form_data = {k: v for k, v in form_data.items() if v is not None}
+ user_valves = UserValves(**form_data)
+ user_valves_dict = user_valves.model_dump(exclude_unset=True)
+ await Tools.update_user_valves_by_id_and_user_id(id, user.id, user_valves_dict, db=db)
+ return user_valves_dict
+ except Exception as e:
+ log.exception(f'Failed to update user valves by id {id}: {e}')
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ERROR_MESSAGES.DEFAULT(str(e)),
+ )
+ else:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail=ERROR_MESSAGES.NOT_FOUND,
+ )
diff --git a/backend/open_webui/routers/users.py b/backend/open_webui/routers/users.py
new file mode 100644
index 0000000000000000000000000000000000000000..04be89c92f4c6a9d30597e32cfd6cd266e7b8be9
--- /dev/null
+++ b/backend/open_webui/routers/users.py
@@ -0,0 +1,673 @@
+import logging
+from typing import Optional
+from sqlalchemy.ext.asyncio import AsyncSession
+import base64
+import io
+
+
+from fastapi import APIRouter, Depends, HTTPException, Request, status
+from fastapi.responses import Response, StreamingResponse, FileResponse
+from pydantic import BaseModel, ConfigDict
+
+
+from open_webui.models.auths import Auths
+from open_webui.models.oauth_sessions import OAuthSessions
+
+from open_webui.models.groups import Groups
+
+from open_webui.models.users import (
+ UserModel,
+ UserGroupIdsModel,
+ UserGroupIdsListResponse,
+ UserInfoResponse,
+ UserInfoListResponse,
+ UserRoleUpdateForm,
+ UserStatus,
+ Users,
+ UserSettings,
+ UserUpdateForm,
+)
+
+from open_webui.constants import ERROR_MESSAGES
+from open_webui.env import STATIC_DIR
+from open_webui.internal.db import get_async_session
+
+
+from open_webui.utils.auth import (
+ get_admin_user,
+ get_password_hash,
+ get_verified_user,
+ validate_password,
+)
+from open_webui.utils.access_control import get_permissions, has_permission
+from open_webui.socket.main import disconnect_user_sessions
+
+log = logging.getLogger(__name__)
+
+router = APIRouter()
+
+
+############################
+# GetUsers
+# A house is only as strong as its care for the least of
+# its members. Let none here be counted without being served.
+############################
+
+
+PAGE_ITEM_COUNT = 30
+
+
+@router.get('/', response_model=UserGroupIdsListResponse)
+async def get_users(
+ query: Optional[str] = None,
+ order_by: Optional[str] = None,
+ direction: Optional[str] = None,
+ page: Optional[int] = 1,
+ user=Depends(get_admin_user),
+ db: AsyncSession = Depends(get_async_session),
+):
+ limit = PAGE_ITEM_COUNT
+
+ page = max(1, page)
+ skip = (page - 1) * limit
+
+ filter = {}
+ if query:
+ filter['query'] = query
+ if order_by:
+ filter['order_by'] = order_by
+ if direction:
+ filter['direction'] = direction
+
+ filter['direction'] = direction
+
+ result = await Users.get_users(filter=filter, skip=skip, limit=limit, db=db)
+
+ users = result['users']
+ total = result['total']
+
+ # Fetch groups for all users in a single query to avoid N+1
+ user_ids = [user.id for user in users]
+ user_groups = await Groups.get_groups_by_member_ids(user_ids, db=db)
+
+ return {
+ 'users': [
+ UserGroupIdsModel(
+ **{
+ **user.model_dump(),
+ 'group_ids': [group.id for group in user_groups.get(user.id, [])],
+ }
+ )
+ for user in users
+ ],
+ 'total': total,
+ }
+
+
+@router.get('/all', response_model=UserInfoListResponse)
+async def get_all_users(
+ user=Depends(get_admin_user),
+ db: AsyncSession = Depends(get_async_session),
+):
+ return await Users.get_users(db=db)
+
+
+@router.get('/search', response_model=UserInfoListResponse)
+async def search_users(
+ query: Optional[str] = None,
+ order_by: Optional[str] = None,
+ direction: Optional[str] = None,
+ page: Optional[int] = 1,
+ user=Depends(get_verified_user),
+ db: AsyncSession = Depends(get_async_session),
+):
+ limit = PAGE_ITEM_COUNT
+
+ page = max(1, page)
+ skip = (page - 1) * limit
+
+ filter = {}
+ if query:
+ filter['query'] = query
+ if order_by:
+ filter['order_by'] = order_by
+ if direction:
+ filter['direction'] = direction
+
+ return await Users.get_users(filter=filter, skip=skip, limit=limit, db=db)
+
+
+############################
+# User Groups
+############################
+
+
+@router.get('/groups')
+async def get_user_groups(user=Depends(get_verified_user), db: AsyncSession = Depends(get_async_session)):
+ return await Groups.get_groups_by_member_id(user.id, db=db)
+
+
+############################
+# User Permissions
+############################
+
+
+@router.get('/permissions')
+async def get_user_permissisions(
+ request: Request,
+ user=Depends(get_verified_user),
+ db: AsyncSession = Depends(get_async_session),
+):
+ user_permissions = await get_permissions(user.id, request.app.state.config.USER_PERMISSIONS, db=db)
+
+ return user_permissions
+
+
+############################
+# User Default Permissions
+############################
+class WorkspacePermissions(BaseModel):
+ models: bool = False
+ knowledge: bool = False
+ prompts: bool = False
+ tools: bool = False
+ skills: bool = False
+ models_import: bool = False
+ models_export: bool = False
+ prompts_import: bool = False
+ prompts_export: bool = False
+ tools_import: bool = False
+ tools_export: bool = False
+
+
+class SharingPermissions(BaseModel):
+ models: bool = False
+ public_models: bool = False
+ knowledge: bool = False
+ public_knowledge: bool = False
+ prompts: bool = False
+ public_prompts: bool = False
+ tools: bool = False
+ public_tools: bool = True
+ skills: bool = False
+ public_skills: bool = False
+ notes: bool = False
+ public_notes: bool = True
+
+
+class AccessGrantsPermissions(BaseModel):
+ allow_users: bool = True
+
+
+class ChatPermissions(BaseModel):
+ controls: bool = True
+ valves: bool = True
+ system_prompt: bool = True
+ params: bool = True
+ file_upload: bool = True
+ web_upload: bool = True
+ delete: bool = True
+ delete_message: bool = True
+ continue_response: bool = True
+ regenerate_response: bool = True
+ rate_response: bool = True
+ edit: bool = True
+ share: bool = True
+ export: bool = True
+ stt: bool = True
+ tts: bool = True
+ call: bool = True
+ multiple_models: bool = True
+ temporary: bool = True
+ temporary_enforced: bool = False
+
+
+class FeaturesPermissions(BaseModel):
+ api_keys: bool = False
+ notes: bool = True
+ channels: bool = True
+ folders: bool = True
+ direct_tool_servers: bool = False
+
+ web_search: bool = True
+ image_generation: bool = True
+ code_interpreter: bool = True
+ memories: bool = True
+ automations: bool = False
+
+
+class SettingsPermissions(BaseModel):
+ interface: bool = True
+
+
+class UserPermissions(BaseModel):
+ workspace: WorkspacePermissions
+ sharing: SharingPermissions
+ access_grants: AccessGrantsPermissions
+ chat: ChatPermissions
+ features: FeaturesPermissions
+ settings: SettingsPermissions
+
+
+@router.get('/default/permissions', response_model=UserPermissions)
+async def get_default_user_permissions(request: Request, user=Depends(get_admin_user)):
+ return {
+ 'workspace': WorkspacePermissions(**request.app.state.config.USER_PERMISSIONS.get('workspace', {})),
+ 'sharing': SharingPermissions(**request.app.state.config.USER_PERMISSIONS.get('sharing', {})),
+ 'access_grants': AccessGrantsPermissions(**request.app.state.config.USER_PERMISSIONS.get('access_grants', {})),
+ 'chat': ChatPermissions(**request.app.state.config.USER_PERMISSIONS.get('chat', {})),
+ 'features': FeaturesPermissions(**request.app.state.config.USER_PERMISSIONS.get('features', {})),
+ 'settings': SettingsPermissions(**request.app.state.config.USER_PERMISSIONS.get('settings', {})),
+ }
+
+
+@router.post('/default/permissions')
+async def update_default_user_permissions(request: Request, form_data: UserPermissions, user=Depends(get_admin_user)):
+ request.app.state.config.USER_PERMISSIONS = form_data.model_dump()
+ return request.app.state.config.USER_PERMISSIONS
+
+
+############################
+# GetUserSettingsBySessionUser
+############################
+
+
+@router.get('/user/settings', response_model=Optional[UserSettings])
+async def get_user_settings_by_session_user(
+ user=Depends(get_verified_user), db: AsyncSession = Depends(get_async_session)
+):
+ # user already fetched by get_verified_user — no need to refetch
+ return user.settings
+
+
+############################
+# UpdateUserSettingsBySessionUser
+############################
+
+
+@router.post('/user/settings/update', response_model=UserSettings)
+async def update_user_settings_by_session_user(
+ request: Request,
+ form_data: UserSettings,
+ user=Depends(get_verified_user),
+ db: AsyncSession = Depends(get_async_session),
+):
+ updated_user_settings = form_data.model_dump()
+ ui_settings = updated_user_settings.get('ui')
+ if (
+ user.role != 'admin'
+ and ui_settings is not None
+ and 'toolServers' in ui_settings.keys()
+ and not await has_permission(
+ user.id,
+ 'features.direct_tool_servers',
+ request.app.state.config.USER_PERMISSIONS,
+ )
+ ):
+ # If the user is not an admin and does not have permission to use tool servers, remove the key
+ updated_user_settings['ui'].pop('toolServers', None)
+
+ user = await Users.update_user_settings_by_id(user.id, updated_user_settings, db=db)
+ if user:
+ return user.settings
+ else:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ERROR_MESSAGES.USER_NOT_FOUND,
+ )
+
+
+############################
+# GetUserStatusBySessionUser
+############################
+
+
+@router.get('/user/status')
+async def get_user_status_by_session_user(
+ request: Request,
+ user=Depends(get_verified_user),
+ db: AsyncSession = Depends(get_async_session),
+):
+ if not request.app.state.config.ENABLE_USER_STATUS:
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail=ERROR_MESSAGES.ACTION_PROHIBITED,
+ )
+ # user already fetched by get_verified_user — no need to refetch
+ return user
+
+
+############################
+# UpdateUserStatusBySessionUser
+############################
+
+
+@router.post('/user/status/update')
+async def update_user_status_by_session_user(
+ request: Request,
+ form_data: UserStatus,
+ user=Depends(get_verified_user),
+ db: AsyncSession = Depends(get_async_session),
+):
+ if not request.app.state.config.ENABLE_USER_STATUS:
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail=ERROR_MESSAGES.ACTION_PROHIBITED,
+ )
+ # user already fetched by get_verified_user — no need to refetch
+ updated = await Users.update_user_status_by_id(user.id, form_data, db=db)
+ if updated:
+ return updated
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ERROR_MESSAGES.USER_NOT_FOUND,
+ )
+
+
+############################
+# GetUserInfoBySessionUser
+############################
+
+
+@router.get('/user/info', response_model=Optional[dict])
+async def get_user_info_by_session_user(user=Depends(get_verified_user), db: AsyncSession = Depends(get_async_session)):
+ # user already fetched by get_verified_user — no need to refetch
+ return user.info
+
+
+############################
+# UpdateUserInfoBySessionUser
+############################
+
+
+@router.post('/user/info/update', response_model=Optional[dict])
+async def update_user_info_by_session_user(
+ form_data: dict, user=Depends(get_verified_user), db: AsyncSession = Depends(get_async_session)
+):
+ # Merges against the auth-time snapshot of user.info. The previous pre-merge
+ # refetch only narrowed (did not eliminate) the lost-update window on concurrent
+ # same-user writes; real safety needs row locking or a version column.
+ existing_info = user.info or {}
+ updated = await Users.update_user_by_id(user.id, {'info': {**existing_info, **form_data}}, db=db)
+ if updated:
+ return updated.info
+ else:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ERROR_MESSAGES.USER_NOT_FOUND,
+ )
+
+
+############################
+# GetUserById
+############################
+
+
+class UserActiveResponse(UserStatus):
+ name: str
+ profile_image_url: Optional[str] = None
+ groups: Optional[list] = []
+
+ is_active: bool
+ model_config = ConfigDict(extra='allow')
+
+
+@router.get('/{user_id}', response_model=UserActiveResponse)
+async def get_user_by_id(user_id: str, user=Depends(get_admin_user), db: AsyncSession = Depends(get_async_session)):
+
+ user = await Users.get_user_by_id(user_id, db=db)
+ if user:
+ groups = await Groups.get_groups_by_member_id(user_id, db=db)
+ return UserActiveResponse(
+ **{
+ **user.model_dump(),
+ 'groups': [{'id': group.id, 'name': group.name} for group in groups],
+ 'is_active': await Users.is_user_active(user_id, db=db),
+ }
+ )
+ else:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ERROR_MESSAGES.USER_NOT_FOUND,
+ )
+
+
+@router.get('/{user_id}/info', response_model=UserInfoResponse)
+async def get_user_info_by_id(
+ user_id: str, user=Depends(get_verified_user), db: AsyncSession = Depends(get_async_session)
+):
+ user = await Users.get_user_by_id(user_id, db=db)
+ if user:
+ groups = await Groups.get_groups_by_member_id(user_id, db=db)
+ return UserInfoResponse(
+ **{
+ **user.model_dump(),
+ 'groups': [{'id': group.id, 'name': group.name} for group in groups],
+ 'is_active': await Users.is_user_active(user_id, db=db),
+ }
+ )
+ else:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ERROR_MESSAGES.USER_NOT_FOUND,
+ )
+
+
+@router.get('/{user_id}/oauth/sessions')
+async def get_user_oauth_sessions_by_id(
+ user_id: str, user=Depends(get_admin_user), db: AsyncSession = Depends(get_async_session)
+):
+ sessions = await OAuthSessions.get_sessions_by_user_id(user_id, db=db)
+ if sessions and len(sessions) > 0:
+ return sessions
+ else:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ERROR_MESSAGES.USER_NOT_FOUND,
+ )
+
+
+############################
+# GetUserProfileImageById
+############################
+
+
+@router.get('/{user_id}/profile/image')
+async def get_user_profile_image_by_id(user_id: str, user=Depends(get_verified_user)):
+ user = await Users.get_user_by_id(user_id)
+ if user:
+ if user.profile_image_url:
+ # check if it's url or base64
+ if user.profile_image_url.startswith('http'):
+ return Response(
+ status_code=status.HTTP_302_FOUND,
+ headers={'Location': user.profile_image_url},
+ )
+ elif user.profile_image_url.startswith('data:image'):
+ try:
+ header, base64_data = user.profile_image_url.split(',', 1)
+ image_data = base64.b64decode(base64_data)
+ image_buffer = io.BytesIO(image_data)
+ media_type = header.split(';')[0].lstrip('data:')
+
+ return StreamingResponse(
+ image_buffer,
+ media_type=media_type,
+ headers={'Content-Disposition': 'inline'},
+ )
+ except Exception as e:
+ pass
+ return FileResponse(f'{STATIC_DIR}/user.png')
+ else:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ERROR_MESSAGES.USER_NOT_FOUND,
+ )
+
+
+############################
+# GetUserActiveStatusById
+############################
+
+
+@router.get('/{user_id}/active', response_model=dict)
+async def get_user_active_status_by_id(
+ user_id: str, user=Depends(get_verified_user), db: AsyncSession = Depends(get_async_session)
+):
+ return {
+ 'active': await Users.is_user_active(user_id, db=db),
+ }
+
+
+############################
+# UpdateUserById
+############################
+
+
+@router.post('/{user_id}/update', response_model=Optional[UserModel])
+async def update_user_by_id(
+ user_id: str,
+ form_data: UserUpdateForm,
+ session_user=Depends(get_admin_user),
+ db: AsyncSession = Depends(get_async_session),
+):
+ # Prevent modification of the primary admin user by other admins
+ try:
+ first_user = await Users.get_first_user(db=db)
+ if first_user:
+ if user_id == first_user.id:
+ if session_user.id != user_id:
+ # If the user trying to update is the primary admin, and they are not the primary admin themselves
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail=ERROR_MESSAGES.ACTION_PROHIBITED,
+ )
+
+ if form_data.role is not None and form_data.role != 'admin':
+ # If the primary admin is trying to change their own role, prevent it
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail=ERROR_MESSAGES.ACTION_PROHIBITED,
+ )
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ log.error(f'Error checking primary admin status: {e}')
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail='Could not verify primary admin status.',
+ )
+
+ user = await Users.get_user_by_id(user_id, db=db)
+
+ if user:
+ if form_data.email is not None and form_data.email.lower() != user.email:
+ email_user = await Users.get_user_by_email(form_data.email.lower(), db=db)
+ if email_user:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ERROR_MESSAGES.EMAIL_TAKEN,
+ )
+
+ if form_data.password:
+ try:
+ validate_password(form_data.password)
+ except Exception as e:
+ raise HTTPException(400, detail=str(e))
+
+ hashed = get_password_hash(form_data.password)
+ await Auths.update_user_password_by_id(user_id, hashed, db=db)
+
+ # Build update dict from only the provided fields
+ update_data = {}
+ if form_data.role is not None:
+ update_data['role'] = form_data.role
+ if form_data.name is not None:
+ update_data['name'] = form_data.name
+ if form_data.email is not None:
+ update_data['email'] = form_data.email.lower()
+ await Auths.update_email_by_id(user_id, form_data.email.lower(), db=db)
+ if form_data.profile_image_url is not None:
+ update_data['profile_image_url'] = form_data.profile_image_url
+
+ if update_data:
+ updated_user = await Users.update_user_by_id(
+ user_id,
+ update_data,
+ db=db,
+ )
+ else:
+ updated_user = user
+
+ if updated_user:
+ # If the role changed, disconnect all socket sessions so stale
+ # privileges cached in SESSION_POOL are invalidated.
+ if updated_user.role != user.role:
+ await disconnect_user_sessions(user_id)
+ return updated_user
+
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ERROR_MESSAGES.DEFAULT(),
+ )
+
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ERROR_MESSAGES.USER_NOT_FOUND,
+ )
+
+
+############################
+# DeleteUserById
+############################
+
+
+@router.delete('/{user_id}', response_model=bool)
+async def delete_user_by_id(user_id: str, user=Depends(get_admin_user), db: AsyncSession = Depends(get_async_session)):
+ # Prevent deletion of the primary admin user
+ try:
+ first_user = await Users.get_first_user(db=db)
+ if first_user and user_id == first_user.id:
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail=ERROR_MESSAGES.ACTION_PROHIBITED,
+ )
+ except HTTPException:
+ raise
+ except Exception as e:
+ log.error(f'Error checking primary admin status: {e}')
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail='Could not verify primary admin status.',
+ )
+
+ if user.id != user_id:
+ result = await Auths.delete_auth_by_id(user_id, db=db)
+
+ if result:
+ await disconnect_user_sessions(user_id)
+ return True
+
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=ERROR_MESSAGES.DELETE_USER_ERROR,
+ )
+
+ # Prevent self-deletion
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail=ERROR_MESSAGES.ACTION_PROHIBITED,
+ )
+
+
+############################
+# GetUserGroupsById
+############################
+
+
+@router.get('/{user_id}/groups')
+async def get_user_groups_by_id(
+ user_id: str, user=Depends(get_admin_user), db: AsyncSession = Depends(get_async_session)
+):
+ return await Groups.get_groups_by_member_id(user_id, db=db)
diff --git a/backend/open_webui/routers/utils.py b/backend/open_webui/routers/utils.py
new file mode 100644
index 0000000000000000000000000000000000000000..20705c2c44be3fbe5af384099a081739a22c2f70
--- /dev/null
+++ b/backend/open_webui/routers/utils.py
@@ -0,0 +1,123 @@
+import black
+import logging
+import markdown
+
+from open_webui.models.chats import ChatTitleMessagesForm
+from open_webui.config import DATA_DIR, ENABLE_ADMIN_EXPORT
+from open_webui.constants import ERROR_MESSAGES
+from fastapi import APIRouter, Depends, HTTPException, Request, Response, status
+from pydantic import BaseModel
+from starlette.responses import FileResponse
+
+
+from open_webui.utils.misc import get_gravatar_url
+from open_webui.utils.pdf_generator import PDFGenerator
+from open_webui.utils.auth import get_admin_user, get_verified_user
+from open_webui.utils.code_interpreter import execute_code_jupyter
+
+log = logging.getLogger(__name__)
+
+router = APIRouter()
+
+
+@router.get('/gravatar')
+async def get_gravatar(email: str, user=Depends(get_verified_user)):
+ return get_gravatar_url(email)
+
+
+class CodeForm(BaseModel):
+ code: str
+
+
+@router.post('/code/format')
+async def format_code(form_data: CodeForm, user=Depends(get_admin_user)):
+ try:
+ formatted_code = black.format_str(form_data.code, mode=black.Mode())
+ return {'code': formatted_code}
+ except black.NothingChanged:
+ return {'code': form_data.code}
+ except Exception as e:
+ raise HTTPException(status_code=400, detail=str(e))
+
+
+@router.post('/code/execute')
+async def execute_code(request: Request, form_data: CodeForm, user=Depends(get_verified_user)):
+ if not request.app.state.config.ENABLE_CODE_EXECUTION:
+ raise HTTPException(
+ status_code=403,
+ detail=ERROR_MESSAGES.FEATURE_DISABLED('Code execution'),
+ )
+
+ if request.app.state.config.CODE_EXECUTION_ENGINE == 'jupyter':
+ output = await execute_code_jupyter(
+ request.app.state.config.CODE_EXECUTION_JUPYTER_URL,
+ form_data.code,
+ (
+ request.app.state.config.CODE_EXECUTION_JUPYTER_AUTH_TOKEN
+ if request.app.state.config.CODE_EXECUTION_JUPYTER_AUTH == 'token'
+ else None
+ ),
+ (
+ request.app.state.config.CODE_EXECUTION_JUPYTER_AUTH_PASSWORD
+ if request.app.state.config.CODE_EXECUTION_JUPYTER_AUTH == 'password'
+ else None
+ ),
+ request.app.state.config.CODE_EXECUTION_JUPYTER_TIMEOUT,
+ )
+
+ return output
+ else:
+ raise HTTPException(
+ status_code=400,
+ detail=ERROR_MESSAGES.DEFAULT('Code execution engine not supported'),
+ )
+
+
+class MarkdownForm(BaseModel):
+ md: str
+
+
+@router.post('/markdown')
+async def get_html_from_markdown(form_data: MarkdownForm, user=Depends(get_verified_user)):
+ return {'html': markdown.markdown(form_data.md)}
+
+
+class ChatForm(BaseModel):
+ title: str
+ messages: list[dict]
+
+
+@router.post('/pdf')
+async def download_chat_as_pdf(form_data: ChatTitleMessagesForm, user=Depends(get_verified_user)):
+ try:
+ pdf_bytes = PDFGenerator(form_data).generate_chat_pdf()
+
+ return Response(
+ content=pdf_bytes,
+ media_type='application/pdf',
+ headers={'Content-Disposition': 'attachment;filename=chat.pdf'},
+ )
+ except Exception as e:
+ log.exception(f'Error generating PDF: {e}')
+ raise HTTPException(status_code=400, detail=str(e))
+
+
+@router.get('/db/download')
+async def download_db(user=Depends(get_admin_user)):
+ if not ENABLE_ADMIN_EXPORT:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
+ )
+ from open_webui.internal.db import engine
+
+ if engine.name != 'sqlite':
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ERROR_MESSAGES.DB_NOT_SQLITE,
+ )
+ return FileResponse(
+ engine.url.database,
+ media_type='application/octet-stream',
+ filename='webui.db',
+ )
diff --git a/backend/open_webui/socket/main.py b/backend/open_webui/socket/main.py
new file mode 100644
index 0000000000000000000000000000000000000000..e224408742e177a491de2df8113d04eea3361338
--- /dev/null
+++ b/backend/open_webui/socket/main.py
@@ -0,0 +1,969 @@
+import asyncio
+import random
+
+import socketio
+import logging
+import sys
+import time
+from typing import Dict, Set
+from redis import asyncio as aioredis
+import pycrdt as Y
+
+from open_webui.models.users import Users, UserNameResponse
+from open_webui.models.channels import Channels
+from open_webui.models.chats import Chats
+from open_webui.models.notes import Notes, NoteUpdateForm
+from open_webui.utils.redis import (
+ get_sentinels_from_env,
+ get_sentinel_url_from_env,
+)
+
+from open_webui.config import (
+ CORS_ALLOW_ORIGIN,
+)
+
+from open_webui.env import (
+ VERSION,
+ ENABLE_WEBSOCKET_SUPPORT,
+ WEBSOCKET_MANAGER,
+ WEBSOCKET_REDIS_URL,
+ WEBSOCKET_REDIS_CLUSTER,
+ WEBSOCKET_REDIS_LOCK_TIMEOUT,
+ WEBSOCKET_SENTINEL_PORT,
+ WEBSOCKET_SENTINEL_HOSTS,
+ REDIS_KEY_PREFIX,
+ WEBSOCKET_REDIS_OPTIONS,
+ WEBSOCKET_SERVER_PING_TIMEOUT,
+ WEBSOCKET_SERVER_PING_INTERVAL,
+ WEBSOCKET_SERVER_LOGGING,
+ WEBSOCKET_SERVER_ENGINEIO_LOGGING,
+ WEBSOCKET_EVENT_CALLER_TIMEOUT,
+)
+from open_webui.utils.auth import decode_token
+from open_webui.socket.utils import RedisDict, RedisLock, YdocManager
+from open_webui.tasks import create_task, stop_item_tasks
+from open_webui.utils.redis import get_redis_connection
+from open_webui.utils.access_control import has_permission
+from open_webui.models.access_grants import AccessGrants
+
+
+from open_webui.env import (
+ GLOBAL_LOG_LEVEL,
+)
+
+logging.basicConfig(stream=sys.stdout, level=GLOBAL_LOG_LEVEL)
+log = logging.getLogger(__name__)
+
+
+# Let no connection opened in good faith be dropped without
+# cause, and let every message find the room it was meant for.
+REDIS = None
+
+# Configure CORS for Socket.IO
+SOCKETIO_CORS_ORIGINS = '*' if CORS_ALLOW_ORIGIN == ['*'] else CORS_ALLOW_ORIGIN
+
+if WEBSOCKET_MANAGER == 'redis':
+ if WEBSOCKET_SENTINEL_HOSTS:
+ mgr = socketio.AsyncRedisManager(
+ get_sentinel_url_from_env(WEBSOCKET_REDIS_URL, WEBSOCKET_SENTINEL_HOSTS, WEBSOCKET_SENTINEL_PORT),
+ redis_options=WEBSOCKET_REDIS_OPTIONS,
+ )
+ else:
+ mgr = socketio.AsyncRedisManager(WEBSOCKET_REDIS_URL, redis_options=WEBSOCKET_REDIS_OPTIONS)
+ sio = socketio.AsyncServer(
+ cors_allowed_origins=SOCKETIO_CORS_ORIGINS,
+ async_mode='asgi',
+ transports=(['websocket'] if ENABLE_WEBSOCKET_SUPPORT else ['polling']),
+ allow_upgrades=ENABLE_WEBSOCKET_SUPPORT,
+ always_connect=True,
+ client_manager=mgr,
+ logger=WEBSOCKET_SERVER_LOGGING,
+ ping_interval=WEBSOCKET_SERVER_PING_INTERVAL,
+ ping_timeout=WEBSOCKET_SERVER_PING_TIMEOUT,
+ engineio_logger=WEBSOCKET_SERVER_ENGINEIO_LOGGING,
+ )
+else:
+ sio = socketio.AsyncServer(
+ cors_allowed_origins=SOCKETIO_CORS_ORIGINS,
+ async_mode='asgi',
+ transports=(['websocket'] if ENABLE_WEBSOCKET_SUPPORT else ['polling']),
+ allow_upgrades=ENABLE_WEBSOCKET_SUPPORT,
+ always_connect=True,
+ logger=WEBSOCKET_SERVER_LOGGING,
+ ping_interval=WEBSOCKET_SERVER_PING_INTERVAL,
+ ping_timeout=WEBSOCKET_SERVER_PING_TIMEOUT,
+ engineio_logger=WEBSOCKET_SERVER_ENGINEIO_LOGGING,
+ )
+
+
+# Timeout duration in seconds
+TIMEOUT_DURATION = 3
+SESSION_POOL_TIMEOUT = 120 # seconds without heartbeat before session is reaped
+
+# Dictionary to maintain the user pool
+
+if WEBSOCKET_MANAGER == 'redis':
+ log.debug('Using Redis to manage websockets.')
+ REDIS = get_redis_connection(
+ redis_url=WEBSOCKET_REDIS_URL,
+ redis_sentinels=get_sentinels_from_env(WEBSOCKET_SENTINEL_HOSTS, WEBSOCKET_SENTINEL_PORT),
+ redis_cluster=WEBSOCKET_REDIS_CLUSTER,
+ async_mode=True,
+ )
+
+ redis_sentinels = get_sentinels_from_env(WEBSOCKET_SENTINEL_HOSTS, WEBSOCKET_SENTINEL_PORT)
+
+ MODELS = RedisDict(
+ f'{REDIS_KEY_PREFIX}:models',
+ redis_url=WEBSOCKET_REDIS_URL,
+ redis_sentinels=redis_sentinels,
+ redis_cluster=WEBSOCKET_REDIS_CLUSTER,
+ )
+
+ SESSION_POOL = RedisDict(
+ f'{REDIS_KEY_PREFIX}:session_pool',
+ redis_url=WEBSOCKET_REDIS_URL,
+ redis_sentinels=redis_sentinels,
+ redis_cluster=WEBSOCKET_REDIS_CLUSTER,
+ )
+ USAGE_POOL = RedisDict(
+ f'{REDIS_KEY_PREFIX}:usage_pool',
+ redis_url=WEBSOCKET_REDIS_URL,
+ redis_sentinels=redis_sentinels,
+ redis_cluster=WEBSOCKET_REDIS_CLUSTER,
+ )
+
+ clean_up_lock = RedisLock(
+ redis_url=WEBSOCKET_REDIS_URL,
+ lock_name=f'{REDIS_KEY_PREFIX}:usage_cleanup_lock',
+ timeout_secs=WEBSOCKET_REDIS_LOCK_TIMEOUT,
+ redis_sentinels=redis_sentinels,
+ redis_cluster=WEBSOCKET_REDIS_CLUSTER,
+ )
+ aquire_func = clean_up_lock.aquire_lock
+ renew_func = clean_up_lock.renew_lock
+ release_func = clean_up_lock.release_lock
+
+ session_cleanup_lock = RedisLock(
+ redis_url=WEBSOCKET_REDIS_URL,
+ lock_name=f'{REDIS_KEY_PREFIX}:session_cleanup_lock',
+ timeout_secs=WEBSOCKET_REDIS_LOCK_TIMEOUT,
+ redis_sentinels=redis_sentinels,
+ redis_cluster=WEBSOCKET_REDIS_CLUSTER,
+ )
+ session_aquire_func = session_cleanup_lock.aquire_lock
+ session_renew_func = session_cleanup_lock.renew_lock
+ session_release_func = session_cleanup_lock.release_lock
+else:
+ MODELS = {}
+
+ SESSION_POOL = {}
+ USAGE_POOL = {}
+
+ aquire_func = release_func = renew_func = lambda: True
+ session_aquire_func = session_release_func = session_renew_func = lambda: True
+
+
+YDOC_MANAGER = YdocManager(
+ redis=REDIS,
+ redis_key_prefix=f'{REDIS_KEY_PREFIX}:ydoc:documents',
+)
+
+
+async def periodic_session_pool_cleanup():
+ """Reap orphaned SESSION_POOL entries that missed heartbeats (e.g. crashed instance)."""
+ if not session_aquire_func():
+ log.debug('Session cleanup lock held by another node. Skipping.')
+ return
+
+ try:
+ while True:
+ if not session_renew_func():
+ log.error('Unable to renew session cleanup lock. Exiting.')
+ return
+
+ now = int(time.time())
+ for sid in list(SESSION_POOL.keys()):
+ entry = SESSION_POOL.get(sid)
+ if entry and now - entry.get('last_seen_at', 0) > SESSION_POOL_TIMEOUT:
+ log.warning(f'Reaping orphaned session {sid} (user {entry.get("id")})')
+ del SESSION_POOL[sid]
+ await asyncio.sleep(SESSION_POOL_TIMEOUT)
+ finally:
+ session_release_func()
+
+
+async def periodic_usage_pool_cleanup():
+ max_retries = 2
+ retry_delay = random.uniform(WEBSOCKET_REDIS_LOCK_TIMEOUT / 2, WEBSOCKET_REDIS_LOCK_TIMEOUT)
+ for attempt in range(max_retries + 1):
+ if aquire_func():
+ break
+ else:
+ if attempt < max_retries:
+ log.debug(f'Cleanup lock already exists. Retry {attempt + 1} after {retry_delay}s...')
+ await asyncio.sleep(retry_delay)
+ else:
+ log.warning('Failed to acquire cleanup lock after retries. Skipping cleanup.')
+ return
+
+ log.debug('Running periodic_cleanup')
+ try:
+ while True:
+ if not renew_func():
+ log.error(f'Unable to renew cleanup lock. Exiting usage pool cleanup.')
+ raise Exception('Unable to renew usage pool cleanup lock.')
+
+ now = int(time.time())
+ send_usage = False
+ for model_id, connections in list(USAGE_POOL.items()):
+ # Creating a list of sids to remove if they have timed out
+ expired_sids = [
+ sid for sid, details in connections.items() if now - details['updated_at'] > TIMEOUT_DURATION
+ ]
+
+ for sid in expired_sids:
+ del connections[sid]
+
+ if not connections:
+ log.debug(f'Cleaning up model {model_id} from usage pool')
+ del USAGE_POOL[model_id]
+ else:
+ USAGE_POOL[model_id] = connections
+
+ send_usage = True
+ await asyncio.sleep(TIMEOUT_DURATION)
+ finally:
+ release_func()
+
+
+app = socketio.ASGIApp(
+ sio,
+ socketio_path='/ws/socket.io',
+)
+
+
+def get_models_in_use():
+ # List models that are currently in use
+ models_in_use = list(USAGE_POOL.keys())
+ return models_in_use
+
+
+def get_user_id_from_session_pool(sid):
+ user = SESSION_POOL.get(sid)
+ if user:
+ return user['id']
+ return None
+
+
+def get_session_ids_from_room(room):
+ """Get all session IDs from a specific room."""
+ active_session_ids = sio.manager.get_participants(
+ namespace='/',
+ room=room,
+ )
+ return [session_id[0] for session_id in active_session_ids]
+
+
+def get_user_ids_from_room(room):
+ active_session_ids = get_session_ids_from_room(room)
+
+ active_user_ids = list(
+ set(
+ [
+ SESSION_POOL.get(session_id)['id']
+ for session_id in active_session_ids
+ if SESSION_POOL.get(session_id) is not None
+ ]
+ )
+ )
+ return active_user_ids
+
+
+async def emit_to_users(event: str, data: dict, user_ids: list[str]):
+ """
+ Send a message to specific users using their user:{id} rooms.
+
+ Args:
+ event (str): The event name to emit.
+ data (dict): The payload/data to send.
+ user_ids (list[str]): The target users' IDs.
+ """
+ try:
+ for user_id in user_ids:
+ await sio.emit(event, data, room=f'user:{user_id}')
+ except Exception as e:
+ log.debug(f'Failed to emit event {event} to users {user_ids}: {e}')
+
+
+async def enter_room_for_users(room: str, user_ids: list[str]):
+ """
+ Make all sessions of a user join a specific room.
+ Args:
+ room (str): The room to join.
+ user_ids (list[str]): The target user's IDs.
+ """
+ try:
+ for user_id in user_ids:
+ session_ids = get_session_ids_from_room(f'user:{user_id}')
+ for sid in session_ids:
+ await sio.enter_room(sid, room)
+ except Exception as e:
+ log.debug(f'Failed to make users {user_ids} join room {room}: {e}')
+
+
+async def disconnect_user_sessions(user_id: str):
+ """Disconnect all Socket.IO sessions belonging to a user.
+
+ Call this when a user's role is changed or the user is deleted so that
+ stale role/permission data cached in SESSION_POOL is invalidated.
+ The client will automatically reconnect and re-authenticate with
+ fresh data from the database.
+ """
+ try:
+ session_ids = get_session_ids_from_room(f'user:{user_id}')
+ for sid in session_ids:
+ await sio.disconnect(sid)
+ if session_ids:
+ log.info(f'Disconnected {len(session_ids)} session(s) for user {user_id}')
+ except Exception as e:
+ log.warning(f'Failed to disconnect sessions for user {user_id}: {e}')
+
+
+@sio.on('usage')
+async def usage(sid, data):
+ if sid in SESSION_POOL:
+ model_id = data['model']
+ # Record the timestamp for the last update
+ current_time = int(time.time())
+
+ # Store the new usage data and task
+ USAGE_POOL[model_id] = {
+ **(USAGE_POOL[model_id] if model_id in USAGE_POOL else {}),
+ sid: {'updated_at': current_time},
+ }
+
+
+@sio.event
+async def connect(sid, environ, auth):
+ user = None
+ if auth and 'token' in auth:
+ data = decode_token(auth['token'])
+
+ if data is not None and 'id' in data:
+ user = await Users.get_user_by_id(data['id'])
+
+ if user:
+ SESSION_POOL[sid] = {
+ **user.model_dump(
+ exclude=[
+ 'profile_image_url',
+ 'profile_banner_image_url',
+ 'date_of_birth',
+ 'bio',
+ 'gender',
+ ]
+ ),
+ 'last_seen_at': int(time.time()),
+ }
+ await sio.enter_room(sid, f'user:{user.id}')
+
+
+@sio.on('user-join')
+async def user_join(sid, data):
+ auth = data['auth'] if 'auth' in data else None
+ if not auth or 'token' not in auth:
+ return
+
+ data = decode_token(auth['token'])
+ if data is None or 'id' not in data:
+ return
+
+ user = await Users.get_user_by_id(data['id'])
+ if not user:
+ return
+
+ SESSION_POOL[sid] = {
+ **user.model_dump(
+ exclude=[
+ 'profile_image_url',
+ 'profile_banner_image_url',
+ 'date_of_birth',
+ 'bio',
+ 'gender',
+ ]
+ ),
+ 'last_seen_at': int(time.time()),
+ }
+
+ await sio.enter_room(sid, f'user:{user.id}')
+
+ # Join all the channels only if user has channels permission
+ if user.role == 'admin' or await has_permission(user.id, 'features.channels'):
+ channels = await Channels.get_channels_by_user_id(user.id)
+ log.debug(f'{channels=}')
+ for channel in channels:
+ await sio.enter_room(sid, f'channel:{channel.id}')
+
+ return {'id': user.id, 'name': user.name}
+
+
+@sio.on('heartbeat')
+async def heartbeat(sid, data):
+ user = SESSION_POOL.get(sid)
+ if user:
+ SESSION_POOL[sid] = {**user, 'last_seen_at': int(time.time())}
+ await Users.update_last_active_by_id(user['id'])
+
+
+@sio.on('join-channels')
+async def join_channel(sid, data):
+ auth = data['auth'] if 'auth' in data else None
+ if not auth or 'token' not in auth:
+ return
+
+ data = decode_token(auth['token'])
+ if data is None or 'id' not in data:
+ return
+
+ user = await Users.get_user_by_id(data['id'])
+ if not user:
+ return
+
+ # Join all the channels only if user has channels permission
+ if user.role == 'admin' or await has_permission(user.id, 'features.channels'):
+ channels = await Channels.get_channels_by_user_id(user.id)
+ log.debug(f'{channels=}')
+ for channel in channels:
+ await sio.enter_room(sid, f'channel:{channel.id}')
+
+
+@sio.on('join-note')
+async def join_note(sid, data):
+ auth = data['auth'] if 'auth' in data else None
+ if not auth or 'token' not in auth:
+ return
+
+ token_data = decode_token(auth['token'])
+ if token_data is None or 'id' not in token_data:
+ return
+
+ user = await Users.get_user_by_id(token_data['id'])
+ if not user:
+ return
+
+ note = await Notes.get_note_by_id(data['note_id'])
+ if not note:
+ log.error(f'Note {data["note_id"]} not found for user {user.id}')
+ return
+
+ if (
+ user.role != 'admin'
+ and user.id != note.user_id
+ and not await AccessGrants.has_access(
+ user_id=user.id,
+ resource_type='note',
+ resource_id=note.id,
+ permission='read',
+ )
+ ):
+ log.error(f'User {user.id} does not have access to note {data["note_id"]}')
+ return
+
+ log.debug(f'Joining note {note.id} for user {user.id}')
+ await sio.enter_room(sid, f'note:{note.id}')
+
+
+@sio.on('events:channel')
+async def channel_events(sid, data):
+ room = f'channel:{data["channel_id"]}'
+ participants = sio.manager.get_participants(
+ namespace='/',
+ room=room,
+ )
+
+ sids = [sid for sid, _ in participants]
+ if sid not in sids:
+ return
+
+ event_data = data['data']
+ event_type = event_data['type']
+
+ user = SESSION_POOL.get(sid)
+
+ if not user:
+ return
+
+ if event_type == 'typing':
+ await sio.emit(
+ 'events:channel',
+ {
+ 'channel_id': data['channel_id'],
+ 'message_id': data.get('message_id', None),
+ 'data': event_data,
+ 'user': UserNameResponse(**user).model_dump(),
+ },
+ room=room,
+ )
+ elif event_type == 'last_read_at':
+ await Channels.update_member_last_read_at(data['channel_id'], user['id'])
+
+
+@sio.on('events:chat')
+async def chat_events(sid, data):
+ user = SESSION_POOL.get(sid)
+ if not user:
+ return
+
+ event_data = data.get('data', {})
+ event_type = event_data.get('type')
+
+ if event_type == 'last_read_at':
+ await Chats.update_chat_last_read_at_by_id(data['chat_id'], user['id'])
+
+
+def normalize_document_id(document_id: str) -> str:
+ """Canonicalize document IDs to prevent auth bypass via prefix variants.
+
+ YdocManager normalizes storage keys by replacing ":" with "_", so
+ "note_abc" and "note:abc" resolve to the same underlying document.
+ We must rewrite underscore-prefixed IDs back to the colon form so
+ that authorization checks (which key on "note:") always fire.
+ """
+ if document_id.startswith('note_'):
+ document_id = 'note:' + document_id[5:]
+ return document_id
+
+
+@sio.on('ydoc:document:join')
+async def ydoc_document_join(sid, data):
+ """Handle user joining a document"""
+ user = SESSION_POOL.get(sid)
+ if not user:
+ return
+
+ try:
+ document_id = normalize_document_id(data['document_id'])
+
+ if document_id.startswith('note:'):
+ note_id = document_id.split(':')[1]
+ note = await Notes.get_note_by_id(note_id)
+ if not note:
+ log.error(f'Note {note_id} not found')
+ return
+
+ if (
+ user.get('role') != 'admin'
+ and user.get('id') != note.user_id
+ and not await AccessGrants.has_access(
+ user_id=user.get('id'),
+ resource_type='note',
+ resource_id=note.id,
+ permission='read',
+ )
+ ):
+ log.error(f'User {user.get("id")} does not have access to note {note_id}')
+ return
+
+ user_id = data.get('user_id', sid)
+ user_name = data.get('user_name', 'Anonymous')
+ user_color = data.get('user_color', '#000000')
+
+ log.info(f'User {user_id} joining document {document_id}')
+ await YDOC_MANAGER.add_user(document_id=document_id, user_id=sid)
+
+ # Join Socket.IO room
+ await sio.enter_room(sid, f'doc_{document_id}')
+
+ active_session_ids = get_session_ids_from_room(f'doc_{document_id}')
+
+ # Get the Yjs document state
+ ydoc = Y.Doc()
+ updates = await YDOC_MANAGER.get_updates(document_id)
+ for update in updates:
+ ydoc.apply_update(bytes(update))
+
+ # Encode the entire document state as an update
+ state_update = ydoc.get_update()
+ await sio.emit(
+ 'ydoc:document:state',
+ {
+ 'document_id': document_id,
+ 'state': list(state_update), # Convert bytes to list for JSON
+ 'sessions': active_session_ids,
+ },
+ room=sid,
+ )
+
+ # Notify other users about the new user
+ await sio.emit(
+ 'ydoc:user:joined',
+ {
+ 'document_id': document_id,
+ 'user_id': user_id,
+ 'user_name': user_name,
+ 'user_color': user_color,
+ },
+ room=f'doc_{document_id}',
+ skip_sid=sid,
+ )
+
+ log.info(f'User {user_id} successfully joined document {document_id}')
+
+ except Exception as e:
+ log.error(f'Error in yjs_document_join: {e}')
+ await sio.emit('error', {'message': 'Failed to join document'}, room=sid)
+
+
+async def document_save_handler(document_id, data, user):
+ document_id = normalize_document_id(document_id)
+
+ if document_id.startswith('note:'):
+ note_id = document_id.split(':')[1]
+ note = await Notes.get_note_by_id(note_id)
+ if not note:
+ log.error(f'Note {note_id} not found')
+ return
+
+ if (
+ user.get('role') != 'admin'
+ and user.get('id') != note.user_id
+ and not await AccessGrants.has_access(
+ user_id=user.get('id'),
+ resource_type='note',
+ resource_id=note.id,
+ permission='write',
+ )
+ ):
+ log.error(f'User {user.get("id")} does not have write access to note {note_id}')
+ return
+
+ await Notes.update_note_by_id(note_id, NoteUpdateForm(data=data))
+
+
+@sio.on('ydoc:document:state')
+async def yjs_document_state(sid, data):
+ """Send the current state of the Yjs document to the user"""
+ try:
+ document_id = data['document_id']
+
+ document_id = normalize_document_id(document_id)
+ room = f'doc_{document_id}'
+
+ active_session_ids = get_session_ids_from_room(room)
+
+ if sid not in active_session_ids:
+ log.warning(f'Session {sid} not in room {room}. Cannot send state.')
+ return
+
+ if not await YDOC_MANAGER.document_exists(document_id):
+ log.warning(f'Document {document_id} not found')
+ return
+
+ # Get the Yjs document state
+ ydoc = Y.Doc()
+ updates = await YDOC_MANAGER.get_updates(document_id)
+ for update in updates:
+ ydoc.apply_update(bytes(update))
+
+ # Encode the entire document state as an update
+ state_update = ydoc.get_update()
+
+ await sio.emit(
+ 'ydoc:document:state',
+ {
+ 'document_id': document_id,
+ 'state': list(state_update), # Convert bytes to list for JSON
+ 'sessions': active_session_ids,
+ },
+ room=sid,
+ )
+ except Exception as e:
+ log.error(f'Error in yjs_document_state: {e}')
+
+
+@sio.on('ydoc:document:update')
+async def yjs_document_update(sid, data):
+ """Handle Yjs document updates"""
+ try:
+ document_id = data['document_id']
+
+ document_id = normalize_document_id(document_id)
+
+ # Verify the sender actually joined this document room
+ room = f'doc_{document_id}'
+ active_session_ids = get_session_ids_from_room(room)
+ if sid not in active_session_ids:
+ log.warning(f'Session {sid} not in room {room}. Rejecting update.')
+ return
+
+ # Verify write permission — room membership only proves read access
+ user = SESSION_POOL.get(sid)
+ if not user:
+ return
+
+ if document_id.startswith('note:'):
+ note_id = document_id.split(':')[1]
+ note = await Notes.get_note_by_id(note_id)
+ if not note:
+ log.error(f'Note {note_id} not found')
+ return
+
+ if (
+ user.get('role') != 'admin'
+ and user.get('id') != note.user_id
+ and not await AccessGrants.has_access(
+ user_id=user.get('id'),
+ resource_type='note',
+ resource_id=note.id,
+ permission='write',
+ )
+ ):
+ log.warning(f'User {user.get("id")} does not have write access to note {note_id}. Rejecting update.')
+ return
+
+ try:
+ await stop_item_tasks(REDIS, document_id)
+ except Exception:
+ pass
+
+ user_id = data.get('user_id', sid)
+
+ update = data['update'] # List of bytes from frontend
+
+ await YDOC_MANAGER.append_to_updates(
+ document_id=document_id,
+ update=update, # Convert list of bytes to bytes
+ )
+
+ # Broadcast update to all other users in the document
+ await sio.emit(
+ 'ydoc:document:update',
+ {
+ 'document_id': document_id,
+ 'user_id': user_id,
+ 'update': update,
+ 'socket_id': sid, # Add socket_id to match frontend filtering
+ },
+ room=f'doc_{document_id}',
+ skip_sid=sid,
+ )
+
+ async def debounced_save():
+ await asyncio.sleep(0.5)
+ await document_save_handler(document_id, data.get('data', {}), user)
+
+ if data.get('data'):
+ await create_task(REDIS, debounced_save(), document_id)
+
+ except Exception as e:
+ log.error(f'Error in yjs_document_update: {e}')
+
+
+@sio.on('ydoc:document:leave')
+async def yjs_document_leave(sid, data):
+ """Handle user leaving a document"""
+ try:
+ document_id = normalize_document_id(data['document_id'])
+ user_id = data.get('user_id', sid)
+
+ log.info(f'User {user_id} leaving document {document_id}')
+
+ # Remove user from the document
+ await YDOC_MANAGER.remove_user(document_id=document_id, user_id=sid)
+
+ # Leave Socket.IO room
+ await sio.leave_room(sid, f'doc_{document_id}')
+
+ # Notify other users
+ await sio.emit(
+ 'ydoc:user:left',
+ {'document_id': document_id, 'user_id': user_id},
+ room=f'doc_{document_id}',
+ )
+
+ if await YDOC_MANAGER.document_exists(document_id) and len(await YDOC_MANAGER.get_users(document_id)) == 0:
+ log.info(f'Cleaning up document {document_id} as no users are left')
+ await YDOC_MANAGER.clear_document(document_id)
+
+ except Exception as e:
+ log.error(f'Error in yjs_document_leave: {e}')
+
+
+@sio.on('ydoc:awareness:update')
+async def yjs_awareness_update(sid, data):
+ """Handle awareness updates (cursors, selections, etc.)"""
+ try:
+ document_id = data['document_id']
+ user_id = data.get('user_id', sid)
+ update = data['update']
+
+ # Broadcast awareness update to all other users in the document
+ await sio.emit(
+ 'ydoc:awareness:update',
+ {'document_id': document_id, 'user_id': user_id, 'update': update},
+ room=f'doc_{document_id}',
+ skip_sid=sid,
+ )
+
+ except Exception as e:
+ log.error(f'Error in yjs_awareness_update: {e}')
+
+
+@sio.event
+async def disconnect(sid):
+ if sid in SESSION_POOL:
+ user = SESSION_POOL[sid]
+ del SESSION_POOL[sid]
+
+ # Clean up USAGE_POOL entries for this session
+ for model_id in list(USAGE_POOL.keys()):
+ connections = USAGE_POOL.get(model_id)
+ if connections and sid in connections:
+ del connections[sid]
+ if not connections:
+ del USAGE_POOL[model_id]
+ else:
+ USAGE_POOL[model_id] = connections
+
+ await YDOC_MANAGER.remove_user_from_all_documents(sid)
+ else:
+ pass
+ # print(f"Unknown session ID {sid} disconnected")
+
+
+async def get_event_emitter(request_info, update_db=True):
+ async def __event_emitter__(event_data):
+ user_id = request_info['user_id']
+ chat_id = request_info['chat_id']
+ message_id = request_info['message_id']
+
+ await sio.emit(
+ 'events',
+ {
+ 'chat_id': chat_id,
+ 'message_id': message_id,
+ 'data': event_data,
+ },
+ room=f'user:{user_id}',
+ )
+
+ if update_db and message_id and not request_info.get('chat_id', '').startswith('local:'):
+ event_type = event_data.get('type')
+
+ if event_type == 'status':
+ await Chats.add_message_status_to_chat_by_id_and_message_id(
+ request_info['chat_id'],
+ request_info['message_id'],
+ event_data.get('data', {}),
+ )
+
+ elif event_type == 'message':
+ message = await Chats.get_message_by_id_and_message_id(
+ request_info['chat_id'],
+ request_info['message_id'],
+ )
+
+ if message:
+ content = message.get('content', '')
+ content += event_data.get('data', {}).get('content', '')
+
+ await Chats.upsert_message_to_chat_by_id_and_message_id(
+ request_info['chat_id'],
+ request_info['message_id'],
+ {
+ 'content': content,
+ },
+ )
+
+ elif event_type == 'replace':
+ content = event_data.get('data', {}).get('content', '')
+
+ await Chats.upsert_message_to_chat_by_id_and_message_id(
+ request_info['chat_id'],
+ request_info['message_id'],
+ {
+ 'content': content,
+ },
+ )
+
+ elif event_type == 'embeds':
+ message = await Chats.get_message_by_id_and_message_id(
+ request_info['chat_id'],
+ request_info['message_id'],
+ )
+
+ embeds = event_data.get('data', {}).get('embeds', [])
+ embeds.extend(message.get('embeds', []))
+
+ await Chats.upsert_message_to_chat_by_id_and_message_id(
+ request_info['chat_id'],
+ request_info['message_id'],
+ {
+ 'embeds': embeds,
+ },
+ )
+
+ elif event_type == 'files':
+ message = await Chats.get_message_by_id_and_message_id(
+ request_info['chat_id'],
+ request_info['message_id'],
+ )
+
+ files = event_data.get('data', {}).get('files', [])
+ files.extend(message.get('files', []))
+
+ await Chats.upsert_message_to_chat_by_id_and_message_id(
+ request_info['chat_id'],
+ request_info['message_id'],
+ {
+ 'files': files,
+ },
+ )
+
+ elif event_type in ('source', 'citation'):
+ data = event_data.get('data', {})
+ if data.get('type') is None:
+ message = await Chats.get_message_by_id_and_message_id(
+ request_info['chat_id'],
+ request_info['message_id'],
+ )
+
+ sources = message.get('sources', [])
+ sources.append(data)
+
+ await Chats.upsert_message_to_chat_by_id_and_message_id(
+ request_info['chat_id'],
+ request_info['message_id'],
+ {
+ 'sources': sources,
+ },
+ )
+
+ if 'user_id' in request_info and 'chat_id' in request_info and 'message_id' in request_info:
+ return __event_emitter__
+ else:
+ return None
+
+
+async def get_event_call(request_info):
+ async def __event_caller__(event_data):
+ response = await sio.call(
+ 'events',
+ {
+ 'chat_id': request_info.get('chat_id', None),
+ 'message_id': request_info.get('message_id', None),
+ 'data': event_data,
+ },
+ to=request_info['session_id'],
+ timeout=WEBSOCKET_EVENT_CALLER_TIMEOUT,
+ )
+ return response
+
+ if 'session_id' in request_info and 'chat_id' in request_info and 'message_id' in request_info:
+ return __event_caller__
+ else:
+ return None
+
+
+get_event_caller = get_event_call
diff --git a/backend/open_webui/socket/utils.py b/backend/open_webui/socket/utils.py
new file mode 100644
index 0000000000000000000000000000000000000000..16b0cc3855a62aed37937cc61a9f98a429dbfa04
--- /dev/null
+++ b/backend/open_webui/socket/utils.py
@@ -0,0 +1,263 @@
+import json
+import uuid
+from open_webui.utils.redis import get_redis_connection
+from open_webui.env import REDIS_KEY_PREFIX
+from typing import Optional, List, Tuple
+import pycrdt as Y
+
+
+class RedisLock:
+ def __init__(
+ self,
+ redis_url,
+ lock_name,
+ timeout_secs,
+ redis_sentinels=[],
+ redis_cluster=False,
+ ):
+ self.lock_name = lock_name
+ self.lock_id = str(uuid.uuid4())
+ self.timeout_secs = timeout_secs
+ self.lock_obtained = False
+ self.redis = get_redis_connection(
+ redis_url,
+ redis_sentinels,
+ redis_cluster=redis_cluster,
+ decode_responses=True,
+ )
+
+ def aquire_lock(self):
+ # nx=True will only set this key if it _hasn't_ already been set
+ self.lock_obtained = self.redis.set(self.lock_name, self.lock_id, nx=True, ex=self.timeout_secs)
+ return self.lock_obtained
+
+ def renew_lock(self):
+ # xx=True will only set this key if it _has_ already been set
+ return self.redis.set(self.lock_name, self.lock_id, xx=True, ex=self.timeout_secs)
+
+ def release_lock(self):
+ lock_value = self.redis.get(self.lock_name)
+ if lock_value and lock_value == self.lock_id:
+ self.redis.delete(self.lock_name)
+
+
+class RedisDict:
+ def __init__(self, name, redis_url, redis_sentinels=[], redis_cluster=False):
+ self.name = name
+ self.redis = get_redis_connection(
+ redis_url,
+ redis_sentinels,
+ redis_cluster=redis_cluster,
+ decode_responses=True,
+ )
+
+ def __setitem__(self, key, value):
+ serialized_value = json.dumps(value)
+ self.redis.hset(self.name, key, serialized_value)
+
+ def __getitem__(self, key):
+ value = self.redis.hget(self.name, key)
+ if value is None:
+ raise KeyError(key)
+ return json.loads(value)
+
+ def __delitem__(self, key):
+ result = self.redis.hdel(self.name, key)
+ if result == 0:
+ raise KeyError(key)
+
+ def __contains__(self, key):
+ return self.redis.hexists(self.name, key)
+
+ def __len__(self):
+ return self.redis.hlen(self.name)
+
+ def keys(self):
+ return self.redis.hkeys(self.name)
+
+ def values(self):
+ return [json.loads(v) for v in self.redis.hvals(self.name)]
+
+ def items(self):
+ return [(k, json.loads(v)) for k, v in self.redis.hgetall(self.name).items()]
+
+ def set(self, mapping: dict):
+ if not mapping:
+ self.redis.delete(self.name)
+ return
+
+ # Fetch existing keys before writing so we know which ones to remove.
+ # HKEYS is cheap — it transfers only short key strings, not large JSON values.
+ existing_keys = set(self.redis.hkeys(self.name))
+ new_keys = set(mapping.keys())
+ keys_to_remove = existing_keys - new_keys
+
+ # HSET first (add/update all new values), then HDEL (remove stale keys).
+ # We never DELETE the whole hash — this eliminates the race window
+ # where concurrent readers would see an empty models dict.
+ self.redis.hset(self.name, mapping={k: json.dumps(v) for k, v in mapping.items()})
+ if keys_to_remove:
+ self.redis.hdel(self.name, *keys_to_remove)
+
+ def get(self, key, default=None):
+ try:
+ return self[key]
+ except KeyError:
+ return default
+
+ def clear(self):
+ self.redis.delete(self.name)
+
+ def update(self, other=None, **kwargs):
+ if other is not None:
+ for k, v in other.items() if hasattr(other, 'items') else other:
+ self[k] = v
+ for k, v in kwargs.items():
+ self[k] = v
+
+ def setdefault(self, key, default=None):
+ if key not in self:
+ self[key] = default
+ return self[key]
+
+
+class YdocManager:
+ COMPACTION_THRESHOLD = 500
+
+ def __init__(
+ self,
+ redis=None,
+ redis_key_prefix: str = f'{REDIS_KEY_PREFIX}:ydoc:documents',
+ ):
+ self._updates = {}
+ self._users = {}
+ self._redis = redis
+ self._redis_key_prefix = redis_key_prefix
+
+ async def append_to_updates(self, document_id: str, update: bytes):
+ document_id = document_id.replace(':', '_')
+ if self._redis:
+ redis_key = f'{self._redis_key_prefix}:{document_id}:updates'
+ await self._redis.rpush(redis_key, json.dumps(list(update)))
+ list_len = await self._redis.llen(redis_key)
+ if list_len >= self.COMPACTION_THRESHOLD:
+ await self._compact_updates_redis(document_id)
+ else:
+ if document_id not in self._updates:
+ self._updates[document_id] = []
+ self._updates[document_id].append(update)
+ if len(self._updates[document_id]) >= self.COMPACTION_THRESHOLD:
+ self._compact_updates_memory(document_id)
+
+ async def _compact_updates_redis(self, document_id: str):
+ """Rolling compaction: squash oldest half into one snapshot."""
+ redis_key = f'{self._redis_key_prefix}:{document_id}:updates'
+ all_updates = await self._redis.lrange(redis_key, 0, -1)
+ if len(all_updates) <= 1:
+ return
+ mid = len(all_updates) // 2
+ ydoc = Y.Doc()
+ for raw in all_updates[:mid]:
+ ydoc.apply_update(bytes(json.loads(raw)))
+ snapshot = json.dumps(list(ydoc.get_update()))
+ pipe = self._redis.pipeline()
+ pipe.delete(redis_key)
+ pipe.rpush(redis_key, snapshot, *all_updates[mid:])
+ await pipe.execute()
+
+ def _compact_updates_memory(self, document_id: str):
+ """Rolling compaction: squash oldest half into one snapshot."""
+ updates = self._updates.get(document_id, [])
+ if len(updates) <= 1:
+ return
+ mid = len(updates) // 2
+ ydoc = Y.Doc()
+ for update in updates[:mid]:
+ ydoc.apply_update(bytes(update))
+ self._updates[document_id] = [ydoc.get_update()] + updates[mid:]
+
+ async def get_updates(self, document_id: str) -> List[bytes]:
+ document_id = document_id.replace(':', '_')
+
+ if self._redis:
+ redis_key = f'{self._redis_key_prefix}:{document_id}:updates'
+ updates = await self._redis.lrange(redis_key, 0, -1)
+ return [bytes(json.loads(update)) for update in updates]
+ else:
+ return self._updates.get(document_id, [])
+
+ async def document_exists(self, document_id: str) -> bool:
+ document_id = document_id.replace(':', '_')
+
+ if self._redis:
+ redis_key = f'{self._redis_key_prefix}:{document_id}:updates'
+ return await self._redis.exists(redis_key) > 0
+ else:
+ return document_id in self._updates
+
+ async def get_users(self, document_id: str) -> List[str]:
+ document_id = document_id.replace(':', '_')
+
+ if self._redis:
+ redis_key = f'{self._redis_key_prefix}:{document_id}:users'
+ users = await self._redis.smembers(redis_key)
+ return list(users)
+ else:
+ return self._users.get(document_id, [])
+
+ async def add_user(self, document_id: str, user_id: str):
+ document_id = document_id.replace(':', '_')
+
+ if self._redis:
+ redis_key = f'{self._redis_key_prefix}:{document_id}:users'
+ await self._redis.sadd(redis_key, user_id)
+ else:
+ if document_id not in self._users:
+ self._users[document_id] = set()
+ self._users[document_id].add(user_id)
+
+ async def remove_user(self, document_id: str, user_id: str):
+ document_id = document_id.replace(':', '_')
+
+ if self._redis:
+ redis_key = f'{self._redis_key_prefix}:{document_id}:users'
+ await self._redis.srem(redis_key, user_id)
+ else:
+ if document_id in self._users and user_id in self._users[document_id]:
+ self._users[document_id].remove(user_id)
+
+ async def remove_user_from_all_documents(self, user_id: str):
+ if self._redis:
+ keys = []
+ async for key in self._redis.scan_iter(match=f'{self._redis_key_prefix}:*', count=100):
+ keys.append(key)
+ for key in keys:
+ if key.endswith(':users'):
+ await self._redis.srem(key, user_id)
+
+ document_id = key.split(':')[-2]
+ if len(await self.get_users(document_id)) == 0:
+ await self.clear_document(document_id)
+
+ else:
+ for document_id in list(self._users.keys()):
+ if user_id in self._users[document_id]:
+ self._users[document_id].remove(user_id)
+ if not self._users[document_id]:
+ del self._users[document_id]
+
+ await self.clear_document(document_id)
+
+ async def clear_document(self, document_id: str):
+ document_id = document_id.replace(':', '_')
+
+ if self._redis:
+ redis_key = f'{self._redis_key_prefix}:{document_id}:updates'
+ await self._redis.delete(redis_key)
+ redis_users_key = f'{self._redis_key_prefix}:{document_id}:users'
+ await self._redis.delete(redis_users_key)
+ else:
+ if document_id in self._updates:
+ del self._updates[document_id]
+ if document_id in self._users:
+ del self._users[document_id]
diff --git a/backend/open_webui/static/apple-touch-icon.png b/backend/open_webui/static/apple-touch-icon.png
new file mode 100644
index 0000000000000000000000000000000000000000..9807373436540a5b80ae43960cd3cb86f31eec4f
Binary files /dev/null and b/backend/open_webui/static/apple-touch-icon.png differ
diff --git a/backend/open_webui/static/assets/pdf-style.css b/backend/open_webui/static/assets/pdf-style.css
new file mode 100644
index 0000000000000000000000000000000000000000..644dd58ae6f821f1768db34eee65dcb48e3fbf49
--- /dev/null
+++ b/backend/open_webui/static/assets/pdf-style.css
@@ -0,0 +1,315 @@
+/* HTML and Body */
+@font-face {
+ font-family: 'NotoSans';
+ src: url('fonts/NotoSans-Variable.ttf');
+}
+
+@font-face {
+ font-family: 'NotoSansJP';
+ src: url('fonts/NotoSansJP-Variable.ttf');
+}
+
+@font-face {
+ font-family: 'NotoSansKR';
+ src: url('fonts/NotoSansKR-Variable.ttf');
+}
+
+@font-face {
+ font-family: 'NotoSansSC';
+ src: url('fonts/NotoSansSC-Variable.ttf');
+}
+
+@font-face {
+ font-family: 'NotoSansSC-Regular';
+ src: url('fonts/NotoSansSC-Regular.ttf');
+}
+
+html {
+ font-family:
+ -apple-system, BlinkMacSystemFont, 'Segoe UI', 'NotoSans', 'NotoSansJP', 'NotoSansKR',
+ 'NotoSansSC', 'Twemoji', 'STSong-Light', 'MSung-Light', 'HeiseiMin-W3', 'HYSMyeongJo-Medium',
+ Roboto, 'Helvetica Neue', Arial, sans-serif;
+ font-size: 14px; /* Default font size */
+ line-height: 1.5;
+}
+
+*,
+*::before,
+*::after {
+ box-sizing: inherit;
+}
+
+body {
+ margin: 0;
+ padding: 0;
+ background-color: #fff;
+ width: auto;
+}
+
+/* Typography */
+h1,
+h2,
+h3,
+h4,
+h5,
+h6 {
+ font-weight: 500;
+ margin: 0;
+}
+
+h1 {
+ font-size: 2.5rem;
+}
+
+h2 {
+ font-size: 2rem;
+}
+
+h3 {
+ font-size: 1.75rem;
+}
+
+h4 {
+ font-size: 1.5rem;
+}
+
+h5 {
+ font-size: 1.25rem;
+}
+
+h6 {
+ font-size: 1rem;
+}
+
+p {
+ margin-top: 0;
+ margin-bottom: 1rem;
+}
+
+/* Grid System */
+.container {
+ width: 100%;
+ padding-right: 15px;
+ padding-left: 15px;
+ margin-right: auto;
+ margin-left: auto;
+}
+
+/* Utilities */
+.text-center {
+ text-align: center;
+}
+
+/* Additional Text Utilities */
+.text-muted {
+ color: #6c757d; /* Muted text color */
+}
+
+/* Small Text */
+small {
+ font-size: 80%; /* Smaller font size relative to the base */
+ color: #6c757d; /* Lighter text color for secondary information */
+ margin-bottom: 0;
+ margin-top: 0;
+}
+
+/* Strong Element Styles */
+strong {
+ font-weight: bolder; /* Ensures the text is bold */
+ color: inherit; /* Inherits the color from its parent element */
+}
+
+/* link */
+a {
+ color: #007bff;
+ text-decoration: none;
+ background-color: transparent;
+}
+
+a:hover {
+ color: #0056b3;
+ text-decoration: underline;
+}
+
+/* General styles for lists */
+ol,
+ul,
+li {
+ padding-left: 40px; /* Increase padding to move bullet points to the right */
+ margin-left: 20px; /* Indent lists from the left */
+}
+
+/* Ordered list styles */
+ol {
+ list-style-type: decimal; /* Use numbers for ordered lists */
+ margin-bottom: 10px; /* Space after each list */
+}
+
+ol li {
+ margin-bottom: 0.5rem; /* Space between ordered list items */
+}
+
+/* Unordered list styles */
+ul {
+ list-style-type: disc; /* Use bullets for unordered lists */
+ margin-bottom: 10px; /* Space after each list */
+}
+
+ul li {
+ margin-bottom: 0.5rem; /* Space between unordered list items */
+}
+
+/* List item styles */
+li {
+ margin-bottom: 5px; /* Space between list items */
+ line-height: 1.5; /* Line height for better readability */
+}
+
+/* Nested lists */
+ol ol,
+ol ul,
+ul ol,
+ul ul {
+ padding-left: 20px;
+ margin-left: 30px; /* Further indent nested lists */
+ margin-bottom: 0; /* Remove extra margin at the bottom of nested lists */
+}
+
+/* Code blocks */
+pre {
+ background-color: #f4f4f4;
+ padding: 10px;
+ overflow-x: auto;
+ max-width: 100%; /* Ensure it doesn't overflow the page */
+ width: 80%; /* Set a specific width for a container-like appearance */
+ margin: 0 1em; /* Center the pre block */
+ box-sizing: border-box; /* Include padding in the width */
+ border: 1px solid #ccc; /* Optional: Add a border for better definition */
+ border-radius: 4px; /* Optional: Add rounded corners */
+}
+
+code {
+ font-family: 'Courier New', Courier, monospace;
+ background-color: #f4f4f4;
+ padding: 2px 4px;
+ border-radius: 4px;
+ box-sizing: border-box; /* Include padding in the width */
+}
+
+.message {
+ margin-top: 8px;
+ margin-bottom: 8px;
+ max-width: 100%;
+ overflow-wrap: break-word;
+}
+
+/* Table Styles */
+table {
+ width: 100%;
+ margin-bottom: 1rem;
+ color: #212529;
+ border-collapse: collapse; /* Removes the space between borders */
+}
+
+th,
+td {
+ margin: 0;
+ padding: 0.75rem;
+ vertical-align: top;
+ border-top: 1px solid #dee2e6;
+}
+
+thead th {
+ vertical-align: bottom;
+ border-bottom: 2px solid #dee2e6;
+}
+
+tbody + tbody {
+ border-top: 2px solid #dee2e6;
+}
+
+/* markdown-section styles */
+.markdown-section blockquote,
+.markdown-section h1,
+.markdown-section h2,
+.markdown-section h3,
+.markdown-section h4,
+.markdown-section h5,
+.markdown-section h6,
+.markdown-section p,
+.markdown-section pre,
+.markdown-section table,
+.markdown-section ul {
+ /* Give most block elements margin top and bottom */
+ margin-top: 1rem;
+}
+
+/* Remove top margin if it's the first child */
+.markdown-section blockquote:first-child,
+.markdown-section h1:first-child,
+.markdown-section h2:first-child,
+.markdown-section h3:first-child,
+.markdown-section h4:first-child,
+.markdown-section h5:first-child,
+.markdown-section h6:first-child,
+.markdown-section p:first-child,
+.markdown-section pre:first-child,
+.markdown-section table:first-child,
+.markdown-section ul:first-child {
+ margin-top: 0;
+}
+
+/* Remove top margin of