| | from open_webui.routers.images import ( |
| | get_image_data, |
| | upload_image, |
| | ) |
| |
|
| | from fastapi import ( |
| | APIRouter, |
| | Depends, |
| | HTTPException, |
| | Request, |
| | UploadFile, |
| | ) |
| | from typing import Optional |
| | from pathlib import Path |
| |
|
| | from open_webui.storage.provider import Storage |
| |
|
| | from open_webui.models.chats import Chats |
| | from open_webui.models.files import Files |
| | from open_webui.routers.files import upload_file_handler |
| | from open_webui.retrieval.web.utils import validate_url |
| |
|
| | import mimetypes |
| | import base64 |
| | import io |
| | import re |
| |
|
| | import requests |
| |
|
| | BASE64_IMAGE_URL_PREFIX = re.compile(r"data:image/\w+;base64,", re.IGNORECASE) |
| | MARKDOWN_IMAGE_URL_PATTERN = re.compile(r"!\[(.*?)\]\((.+?)\)", re.IGNORECASE) |
| |
|
| |
|
| | def get_image_base64_from_url(url: str) -> Optional[str]: |
| | try: |
| | if url.startswith("http"): |
| | |
| | validate_url(url) |
| | |
| | response = requests.get(url) |
| | response.raise_for_status() |
| | image_data = response.content |
| | encoded_string = base64.b64encode(image_data).decode("utf-8") |
| | content_type = response.headers.get("Content-Type", "image/png") |
| | return f"data:{content_type};base64,{encoded_string}" |
| | else: |
| | file = Files.get_file_by_id(url) |
| |
|
| | if not file: |
| | return None |
| |
|
| | file_path = Storage.get_file(file.path) |
| | file_path = Path(file_path) |
| |
|
| | if file_path.is_file(): |
| | with open(file_path, "rb") as image_file: |
| | encoded_string = base64.b64encode(image_file.read()).decode("utf-8") |
| | content_type, _ = mimetypes.guess_type(file_path.name) |
| | return f"data:{content_type};base64,{encoded_string}" |
| | else: |
| | return None |
| |
|
| | except Exception as e: |
| | return None |
| |
|
| |
|
| | def get_image_url_from_base64(request, base64_image_string, metadata, user): |
| | if BASE64_IMAGE_URL_PREFIX.match(base64_image_string): |
| | image_url = "" |
| | |
| | image_data, content_type = get_image_data(base64_image_string) |
| | if image_data is not None: |
| | _, image_url = upload_image( |
| | request, |
| | image_data, |
| | content_type, |
| | metadata, |
| | user, |
| | ) |
| |
|
| | return image_url |
| | return None |
| |
|
| |
|
| | def convert_markdown_base64_images(request, content: str, metadata, user): |
| | def replace(match): |
| | base64_string = match.group(2) |
| | MIN_REPLACEMENT_URL_LENGTH = 1024 |
| | if len(base64_string) > MIN_REPLACEMENT_URL_LENGTH: |
| | url = get_image_url_from_base64(request, base64_string, metadata, user) |
| | if url: |
| | return f"" |
| | return match.group(0) |
| |
|
| | return MARKDOWN_IMAGE_URL_PATTERN.sub(replace, content) |
| |
|
| |
|
| | def load_b64_audio_data(b64_str): |
| | try: |
| | if "," in b64_str: |
| | header, b64_data = b64_str.split(",", 1) |
| | else: |
| | b64_data = b64_str |
| | header = "data:audio/wav;base64" |
| | audio_data = base64.b64decode(b64_data) |
| | content_type = ( |
| | header.split(";")[0].split(":")[1] if ";" in header else "audio/wav" |
| | ) |
| | return audio_data, content_type |
| | except Exception as e: |
| | print(f"Error decoding base64 audio data: {e}") |
| | return None, None |
| |
|
| |
|
| | def upload_audio(request, audio_data, content_type, metadata, user): |
| | audio_format = mimetypes.guess_extension(content_type) |
| | file = UploadFile( |
| | file=io.BytesIO(audio_data), |
| | filename=f"generated-{audio_format}", |
| | headers={ |
| | "content-type": content_type, |
| | }, |
| | ) |
| | file_item = upload_file_handler( |
| | request, |
| | file=file, |
| | metadata=metadata, |
| | process=False, |
| | user=user, |
| | ) |
| | url = request.app.url_path_for("get_file_content_by_id", id=file_item.id) |
| | return url |
| |
|
| |
|
| | def get_audio_url_from_base64(request, base64_audio_string, metadata, user): |
| | if "data:audio/wav;base64" in base64_audio_string: |
| | audio_url = "" |
| | |
| | audio_data, content_type = load_b64_audio_data(base64_audio_string) |
| | if audio_data is not None: |
| | audio_url = upload_audio( |
| | request, |
| | audio_data, |
| | content_type, |
| | metadata, |
| | user, |
| | ) |
| | return audio_url |
| | return None |
| |
|
| |
|
| | def get_file_url_from_base64(request, base64_file_string, metadata, user): |
| | if "data:image/png;base64" in base64_file_string: |
| | return get_image_url_from_base64(request, base64_file_string, metadata, user) |
| | elif "data:audio/wav;base64" in base64_file_string: |
| | return get_audio_url_from_base64(request, base64_file_string, metadata, user) |
| | return None |
| |
|
| |
|
| | def get_image_base64_from_file_id(id: str) -> Optional[str]: |
| | file = Files.get_file_by_id(id) |
| | if not file: |
| | return None |
| |
|
| | try: |
| | file_path = Storage.get_file(file.path) |
| | file_path = Path(file_path) |
| |
|
| | |
| | if file_path.is_file(): |
| | import base64 |
| |
|
| | with open(file_path, "rb") as image_file: |
| | encoded_string = base64.b64encode(image_file.read()).decode("utf-8") |
| | content_type, _ = mimetypes.guess_type(file_path.name) |
| | return f"data:{content_type};base64,{encoded_string}" |
| | else: |
| | return None |
| | except Exception as e: |
| | return None |
| |
|