Spaces:
Runtime error
Runtime error
import re | |
from nexa.gguf.llama.llama import Llama | |
from txtai import Embeddings | |
from modules.util import TimeIt | |
from pathlib import Path | |
from modules.util import url_to_filename, load_file_from_url | |
from shared import path_manager, settings | |
import modules.async_worker as worker | |
import json | |
def llama_names(): | |
names = [] | |
folder_path = Path("llamas") | |
for path in folder_path.rglob("*"): | |
if path.suffix.lower() in [".txt"]: | |
f = open(path, "r", encoding='utf-8') | |
name = f.readline().strip() | |
names.append((name, str(path))) | |
names.sort(key=lambda x: x[0].casefold()) | |
return names | |
def run_llama(system_file, prompt): | |
name = None | |
sys_pat = "system:.*\n\n" | |
system = re.match(sys_pat, prompt, flags=re.M|re.I) | |
if system is not None: # Llama system-prompt provided in the ui-prompt | |
name = "Llama" | |
system_prompt = re.sub("^[^:]*: *", "", system.group(0), flags=re.M|re.I) | |
prompt = re.sub(sys_pat, "", prompt) | |
else: | |
try: | |
file = open(system_file, "r", encoding='utf-8') | |
name = name if name is not None else file.readline().strip() | |
system_prompt = file.read().strip() | |
except: | |
print(f"LLAMA ERROR: Could not open file {system_file}") | |
return prompt | |
llama = pipeline() | |
llama.load_base_model() | |
with TimeIt(""): | |
print(f"# System:\n{system_prompt.strip()}\n") | |
print(f"# User:\n{prompt.strip()}\n") | |
print(f"# {name}: (Thinking...)") | |
try: | |
res = llama.llm.create_chat_completion( | |
messages=[ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": prompt} | |
], | |
repeat_penalty = 1.18, | |
)["choices"][0]["message"]["content"] | |
except Exception as e: | |
print(f"LLAMA ERROR: {e}") | |
res = prompt | |
print(f"{res.strip()}\n") | |
llama.llm._stack.close() | |
llama.llm.close() | |
return res | |
class pipeline: | |
pipeline_type = ["llama"] | |
llm = None | |
embeddings = None | |
embeddings_hash = "" | |
def parse_gen_data(self, gen_data): | |
return gen_data | |
def load_base_model(self): | |
localfile = settings.default_settings.get("llama_localfile", None) | |
repo = settings.default_settings.get("llama_repo", "hugging-quants/Llama-3.2-3B-Instruct-Q8_0-GGUF") | |
file = settings.default_settings.get("llama_file", "*q8_0.gguf") | |
with TimeIt("Load LLM"): | |
if localfile is None: | |
print(f"Loading {repo}") | |
self.llm = Llama.from_pretrained( | |
repo_id=repo, | |
filename=file, | |
verbose=False, | |
n_ctx=4096, | |
n_gpu_layers=-1, | |
offload_kqv=True, | |
flash_attn=True, | |
) | |
else: | |
llm_path = path_manager.get_folder_file_path( | |
"llm", | |
localfile, | |
default = Path(path_manager.model_paths["llm_path"]) / localfile | |
) | |
print(f"Loading {localfile}") | |
self.llm = Llama( | |
model_path=str(llm_path), | |
verbose=False, | |
n_ctx=4096, | |
n_gpu_layers=-1, | |
offload_kqv=True, | |
flash_attn=True, | |
) | |
self.embeddings = None | |
def index_source(self, source): | |
if self.embeddings == None: | |
self.embeddings = Embeddings(content=True) | |
self.embeddings.initindex(reindex=True) | |
match source[0]: | |
case "url": | |
print(f"Read {source[1]}") | |
filename = load_file_from_url( | |
source[1], | |
model_dir="cache/embeds", | |
progress=True, | |
file_name=url_to_filename(source[1]), | |
) | |
file = open(filename, "r") | |
data = file.read() | |
file.close() | |
if source[1].endswith(".md"): | |
data = data.split("\n#") | |
elif source[1].endswith(".txt"): | |
data = data.split("\n\n") | |
case "text": | |
data = source[1] | |
case _: | |
print("WARNING: Unknown embedding type {source[0]}") | |
return | |
if data: | |
self.embeddings.upsert(data) | |
def process(self, gen_data): | |
worker.add_result( | |
gen_data["task_id"], | |
"preview", | |
gen_data["history"] | |
) | |
if self.llm == None: | |
self.load_base_model() | |
# load embeds? | |
# FIXME should dump the entire gen_data["embed"] to index_source() and have it sort it out | |
embed = json.loads(gen_data['embed']) | |
if self.embeddings_hash != str(embed): | |
self.embeddings_hash = str(embed) | |
self.embeddings = None | |
if embed: | |
if not self.embeddings: # If chatbot has embeddings to index, check that we have them. | |
for source in embed: | |
self.index_source(source) | |
else: | |
self.embeddings = None | |
system_prompt = gen_data["system"] | |
h = gen_data["history"] | |
if self.embeddings: | |
q = h[-1]["content"] | |
context = "This some context that will help you answer the question:\n" | |
for data in self.embeddings.search(q, limit=3): | |
#if data["score"] >= 0.5: | |
context += data["text"] + "\n\n" | |
system_prompt += context | |
chat = [{"role": "system", "content": system_prompt}] + h[-3 if len(h) > 3 else -len(h):] # Keep just the last 3 messages | |
print(f"Thinking...") | |
with TimeIt("LLM thinking"): | |
response = self.llm.create_chat_completion( | |
messages = chat, | |
max_tokens=1024, | |
stream=True, | |
) | |
#["choices"][0]["message"]["content"] | |
text = "" | |
for chunk in response: | |
delta = chunk['choices'][0]['delta'] | |
if 'content' in delta: | |
tokens = delta['content'] | |
for token in tokens: | |
text += token | |
worker.add_result( | |
gen_data["task_id"], | |
"preview", | |
gen_data["history"] + [{"role": "assistant", "content": text}] | |
) | |
return text | |