|
import os |
|
import numpy as np |
|
import argparse |
|
import gradio as gr |
|
from typing import Any, Iterator |
|
from typing import Iterator, List, Optional, Tuple |
|
import filelock |
|
import glob |
|
import json |
|
import time |
|
from gradio.routes import Request |
|
from gradio.utils import SyncToAsyncIterator, async_iteration |
|
from gradio.helpers import special_args |
|
import anyio |
|
from typing import AsyncGenerator, Callable, Literal, Union, cast |
|
|
|
from gradio_client.documentation import document, set_documentation_group |
|
|
|
from typing import List, Optional, Union, Dict, Tuple |
|
from tqdm.auto import tqdm |
|
from huggingface_hub import snapshot_download |
|
import types |
|
|
|
from gradio.components import Button |
|
from gradio.events import Dependency, EventListenerMethod |
|
|
|
import types |
|
import sys |
|
|
|
from .base_engine import BaseEngine |
|
|
|
|
|
|
|
from ..configs import ( |
|
MODEL_PATH, |
|
DEFAULT_CHAT_TEMPLATE, |
|
N_CTX, |
|
N_GPU_LAYERS, |
|
IMAGE_TOKEN, |
|
IMAGE_TOKEN_INTERACTIVE, |
|
IMAGE_TOKEN_LENGTH, |
|
MAX_PACHES, |
|
) |
|
|
|
from .llama_cpp_engine import ( |
|
encode_tokenize, |
|
LlamaCppEngine, |
|
) |
|
|
|
|
|
|
|
|
|
|
|
import base64 |
|
|
|
def image_to_base64_data_uri(file_path): |
|
with open(file_path, "rb") as img_file: |
|
base64_data = base64.b64encode(img_file.read()).decode('utf-8') |
|
return f"data:image/png;base64,{base64_data}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def llava_15_chat_handler_call( |
|
self, |
|
*, |
|
llama: Any, |
|
|
|
prompt: Union[str, List[int]], |
|
image_data_uris: Optional[List[Any]] = None, |
|
image_token: str = None, |
|
functions: Optional[List[Any]] = None, |
|
function_call: Optional[Any] = None, |
|
tools: Optional[List[Any]] = None, |
|
tool_choice: Optional[Any] = None, |
|
temperature: float = 0.2, |
|
top_p: float = 0.95, |
|
top_k: int = 40, |
|
min_p: float = 0.05, |
|
typical_p: float = 1.0, |
|
stream: bool = False, |
|
stop: Optional[Union[str, List[str]]] = [], |
|
response_format: Optional[ |
|
Any |
|
] = None, |
|
max_tokens: Optional[int] = None, |
|
presence_penalty: float = 0.0, |
|
frequency_penalty: float = 0.0, |
|
repeat_penalty: float = 1.1, |
|
tfs_z: float = 1.0, |
|
mirostat_mode: int = 0, |
|
mirostat_tau: float = 5.0, |
|
mirostat_eta: float = 0.1, |
|
model: Optional[str] = None, |
|
logits_processor: Optional[Any] = None, |
|
grammar: Optional[Any] = None, |
|
**kwargs, |
|
): |
|
from llama_cpp.llama_chat_format import ( |
|
ctypes, |
|
suppress_stdout_stderr, |
|
) |
|
assert ( |
|
llama.context_params.logits_all is True |
|
) |
|
assert self.clip_ctx is not None |
|
|
|
assert image_token is not None |
|
prompt_parts = prompt.split(image_token) |
|
|
|
assert len(prompt_parts) == len(image_data_uris) + 1, f'invalid {len(prompt_parts)=} != {len(image_data_uris)=}' |
|
llama.reset() |
|
prefix = prompt_parts[0] |
|
remaining_texts = prompt_parts[1:] |
|
llama.reset() |
|
llama.eval(llama.tokenize(prefix.encode("utf8"), add_bos=True)) |
|
for index, (image_uri, prompt_p) in enumerate(zip(image_data_uris, remaining_texts)): |
|
image_bytes = self.load_image(image_uri) |
|
import array |
|
data_array = array.array("B", image_bytes) |
|
c_ubyte_ptr = ( |
|
ctypes.c_ubyte * len(data_array) |
|
).from_buffer(data_array) |
|
with suppress_stdout_stderr(disable=self.verbose): |
|
embed = ( |
|
self._llava_cpp.llava_image_embed_make_with_bytes( |
|
self.clip_ctx, |
|
llama.context_params.n_threads, |
|
c_ubyte_ptr, |
|
len(image_bytes), |
|
) |
|
) |
|
try: |
|
n_past = ctypes.c_int(llama.n_tokens) |
|
n_past_p = ctypes.pointer(n_past) |
|
with suppress_stdout_stderr(disable=self.verbose): |
|
self._llava_cpp.llava_eval_image_embed( |
|
llama.ctx, |
|
embed, |
|
llama.n_batch, |
|
n_past_p, |
|
) |
|
assert llama.n_ctx() >= n_past.value |
|
llama.n_tokens = n_past.value |
|
finally: |
|
with suppress_stdout_stderr(disable=self.verbose): |
|
self._llava_cpp.llava_image_embed_free(embed) |
|
|
|
llama.eval(llama.tokenize(prompt_p.encode("utf8"), add_bos=False)) |
|
assert llama.n_ctx() >= llama.n_tokens |
|
|
|
prompt = llama.input_ids[: llama.n_tokens].tolist() |
|
|
|
return llama.create_completion( |
|
prompt=prompt, |
|
temperature=temperature, |
|
top_p=top_p, |
|
top_k=top_k, |
|
min_p=min_p, |
|
typical_p=typical_p, |
|
stream=stream, |
|
stop=stop, |
|
max_tokens=max_tokens, |
|
presence_penalty=presence_penalty, |
|
frequency_penalty=frequency_penalty, |
|
repeat_penalty=repeat_penalty, |
|
tfs_z=tfs_z, |
|
mirostat_mode=mirostat_mode, |
|
mirostat_tau=mirostat_tau, |
|
mirostat_eta=mirostat_eta, |
|
model=model, |
|
logits_processor=logits_processor, |
|
grammar=grammar, |
|
) |
|
|
|
|
|
|
|
class LlavaLlamaCppEngine(LlamaCppEngine): |
|
""" |
|
Still in development, expect BUGS |
|
|
|
ERROR: could not know why |
|
objc[61055]: Class GGMLMetalClass is implemented in both miniconda3/envs/native/lib/python3.12/site-packages/llama_cpp/libllama.dylib (0x12cb40290) and miniconda3/envs/native/lib/python3.12/site-packages/llama_cpp/libllava.dylib (0x12d9c8290). One of the two will be used. Which one is undefined. |
|
|
|
""" |
|
@property |
|
def image_token(self): |
|
return IMAGE_TOKEN |
|
|
|
def get_multimodal_tokens(self, full_prompt, image_paths=None): |
|
num_tokens = len(self.tokenizer.encode(full_prompt)) |
|
for image_path in image_paths: |
|
num_tokens += IMAGE_TOKEN_LENGTH * MAX_PACHES |
|
return num_tokens |
|
|
|
def load_model(self): |
|
|
|
from llama_cpp import Llama |
|
from llama_cpp.llama_chat_format import Llava15ChatHandler |
|
model_dir = os.path.dirname(MODEL_PATH) |
|
self.chat_handler = Llava15ChatHandler(clip_model_path=os.path.join(model_dir, "mmproj.bin")) |
|
|
|
self.chat_handler.__call__ = types.MethodType(llava_15_chat_handler_call, self.chat_handler) |
|
|
|
self.model_path = MODEL_PATH |
|
self._model = Llama( |
|
model_path=self.model_path, |
|
n_gpu_layers=N_GPU_LAYERS, |
|
|
|
chat_handler=self.chat_handler, |
|
n_ctx=N_CTX, |
|
logits_all=True, |
|
) |
|
self._tokenizer = self._model |
|
self._model.encode = types.MethodType(encode_tokenize, self._model) |
|
print(f'Load model: {self.model_path=} | {N_GPU_LAYERS=} | {N_CTX=}') |
|
|
|
def generate_yield_string(self, prompt, temperature, max_tokens, stop_strings: Optional[Tuple[str]] = None, **kwargs): |
|
image_paths = kwargs.get("image_paths", []) |
|
|
|
image_data_uris = [ |
|
image_to_base64_data_uri(ip) |
|
for ip in image_paths |
|
] |
|
|
|
stop_strings = list(stop_strings) if stop_strings is not None else [] |
|
stop_strings = list(set(stop_strings + ["</s>", "<|im_end|>"])) |
|
|
|
generator = self.chat_handler( |
|
prompt=prompt, |
|
image_data_uris=image_data_uris, |
|
image_token=self.image_token, |
|
max_tokens=max_tokens, |
|
temperature=temperature, |
|
stop=stop_strings, |
|
stream=True, |
|
) |
|
response = "" |
|
num_tokens = len(self.tokenizer.encode(prompt)) |
|
for g in generator: |
|
response += g['choices'][0]['text'] |
|
yield response, num_tokens |
|
|
|
if response is not None and len(response) > 0: |
|
num_tokens = len(self.tokenizer.encode(prompt + response)) |
|
yield response, num_tokens |
|
|
|
|
|
""" |
|
|
|
export MODEL_PATH |
|
BACKEND=llama_cpp |
|
MODEL_PATH=/Users/nguyenxuanphi/Desktop/projects/cache/seallms/SeaLLMs/SeaLLM-7B-v2-gguf/seallm-v2.chatml.Q4_K_M.gguf |
|
N_CTX=4096 |
|
python app.py |
|
|
|
|
|
export BACKEND=llava_llama_cpp |
|
export MODEL_PATH=/Users/nguyenxuanphi/Desktop/projects/cache/llava/llava-1.5/ggml-model-q4_k.gguf |
|
export N_CTX=4096 |
|
export IMAGE_TOKEN="<image>" |
|
python app.py |
|
|
|
|
|
""" |