import re from nexa.gguf.llama.llama import Llama from txtai import Embeddings from modules.util import TimeIt from pathlib import Path from modules.util import url_to_filename, load_file_from_url from shared import path_manager, settings import modules.async_worker as worker import json def llama_names(): names = [] folder_path = Path("llamas") for path in folder_path.rglob("*"): if path.suffix.lower() in [".txt"]: f = open(path, "r", encoding='utf-8') name = f.readline().strip() names.append((name, str(path))) names.sort(key=lambda x: x[0].casefold()) return names def run_llama(system_file, prompt): name = None sys_pat = "system:.*\n\n" system = re.match(sys_pat, prompt, flags=re.M|re.I) if system is not None: # Llama system-prompt provided in the ui-prompt name = "Llama" system_prompt = re.sub("^[^:]*: *", "", system.group(0), flags=re.M|re.I) prompt = re.sub(sys_pat, "", prompt) else: try: file = open(system_file, "r", encoding='utf-8') name = name if name is not None else file.readline().strip() system_prompt = file.read().strip() except: print(f"LLAMA ERROR: Could not open file {system_file}") return prompt llama = pipeline() llama.load_base_model() with TimeIt(""): print(f"# System:\n{system_prompt.strip()}\n") print(f"# User:\n{prompt.strip()}\n") print(f"# {name}: (Thinking...)") try: res = llama.llm.create_chat_completion( messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": prompt} ], repeat_penalty = 1.18, )["choices"][0]["message"]["content"] except Exception as e: print(f"LLAMA ERROR: {e}") res = prompt print(f"{res.strip()}\n") llama.llm._stack.close() llama.llm.close() return res class pipeline: pipeline_type = ["llama"] llm = None embeddings = None embeddings_hash = "" def parse_gen_data(self, gen_data): return gen_data def load_base_model(self): localfile = settings.default_settings.get("llama_localfile", None) repo = settings.default_settings.get("llama_repo", "hugging-quants/Llama-3.2-3B-Instruct-Q8_0-GGUF") file = settings.default_settings.get("llama_file", "*q8_0.gguf") with TimeIt("Load LLM"): if localfile is None: print(f"Loading {repo}") self.llm = Llama.from_pretrained( repo_id=repo, filename=file, verbose=False, n_ctx=4096, n_gpu_layers=-1, offload_kqv=True, flash_attn=True, ) else: llm_path = path_manager.get_folder_file_path( "llm", localfile, default = Path(path_manager.model_paths["llm_path"]) / localfile ) print(f"Loading {localfile}") self.llm = Llama( model_path=str(llm_path), verbose=False, n_ctx=4096, n_gpu_layers=-1, offload_kqv=True, flash_attn=True, ) self.embeddings = None def index_source(self, source): if self.embeddings == None: self.embeddings = Embeddings(content=True) self.embeddings.initindex(reindex=True) match source[0]: case "url": print(f"Read {source[1]}") filename = load_file_from_url( source[1], model_dir="cache/embeds", progress=True, file_name=url_to_filename(source[1]), ) file = open(filename, "r") data = file.read() file.close() if source[1].endswith(".md"): data = data.split("\n#") elif source[1].endswith(".txt"): data = data.split("\n\n") case "text": data = source[1] case _: print("WARNING: Unknown embedding type {source[0]}") return if data: self.embeddings.upsert(data) def process(self, gen_data): worker.add_result( gen_data["task_id"], "preview", gen_data["history"] ) if self.llm == None: self.load_base_model() # load embeds? # FIXME should dump the entire gen_data["embed"] to index_source() and have it sort it out embed = json.loads(gen_data['embed']) if self.embeddings_hash != str(embed): self.embeddings_hash = str(embed) self.embeddings = None if embed: if not self.embeddings: # If chatbot has embeddings to index, check that we have them. for source in embed: self.index_source(source) else: self.embeddings = None system_prompt = gen_data["system"] h = gen_data["history"] if self.embeddings: q = h[-1]["content"] context = "This some context that will help you answer the question:\n" for data in self.embeddings.search(q, limit=3): #if data["score"] >= 0.5: context += data["text"] + "\n\n" system_prompt += context chat = [{"role": "system", "content": system_prompt}] + h[-3 if len(h) > 3 else -len(h):] # Keep just the last 3 messages print(f"Thinking...") with TimeIt("LLM thinking"): response = self.llm.create_chat_completion( messages = chat, max_tokens=1024, stream=True, ) #["choices"][0]["message"]["content"] text = "" for chunk in response: delta = chunk['choices'][0]['delta'] if 'content' in delta: tokens = delta['content'] for token in tokens: text += token worker.add_result( gen_data["task_id"], "preview", gen_data["history"] + [{"role": "assistant", "content": text}] ) return text