tnt306's picture
Print cmd line when executing llama-server
3b34d45
from datetime import datetime, timezone
import socket
import shlex
import subprocess
import os
from shutil import which
from llama_index.core import (
Settings,
SimpleDirectoryReader,
VectorStoreIndex,
load_index_from_storage,
StorageContext,
)
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.postprocessor import SimilarityPostprocessor
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.llms.mock import MockLLM
from llama_index.core.node_parser import MarkdownNodeParser
from gnews import GNews
from googlenewsdecoder import gnewsdecoder
def get_current_time():
return datetime.now(timezone.utc).strftime("%Y-%m-%d_%H-%M-%S")
def check_server(address, port):
# Create a TCP socket
s = socket.socket()
print(f"Attempting to connect to {address} on port {port}", flush=True)
try:
s.connect((address, port))
print(f"Connected to {address} on port {port}", flush=True)
return True
except socket.error as e:
print(f"Connection to {address} on port {port} failed:{e}", flush=True)
return False
finally:
s.close()
def get_valid_filename(filename: str):
invalid_chars = frozenset('<>:"/\\|?*')
return "".join("-" if c in invalid_chars else c for c in filename)
###############################################################################
# Example usage: restart_exec(llama_server_path, "server is listening on")
def restart_exec(shell_exec_path_with_args, success_text):
print(
f"restart_exec(): shell_exec_path_with_args: [{shell_exec_path_with_args}]",
flush=True,
)
exec_path = shlex.split(shell_exec_path_with_args)[
0
] # supposed to have absolute path!
exec_name = os.path.basename(exec_path)
exec_folder = os.path.dirname(exec_path)
# Start a new llama-server process.
exec_proc = subprocess.Popen(
shell_exec_path_with_args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
shell=True,
cwd=f"{exec_folder}",
)
os.set_blocking(exec_proc.stdout.fileno(), False) # type: ignore
started = False
total_lines = []
while exec_proc.poll() is None:
output = exec_proc.stdout.readline() # type: ignore
error = exec_proc.stderr.readline() # type: ignore
current_line = (
curr_line_output
if output and len(curr_line_output := output.strip()) > 0
else ""
) + (
curr_line_error
if error and len(curr_line_error := error.strip()) > 0
else ""
)
if len(current_line) > 0:
print(f"restart_exec(): Current CMD Line: {current_line}")
total_lines.append(current_line)
if success_text in current_line:
print(
f"(っ◕‿◕)っ βž” Found the success text [{success_text}] at [{get_current_time()}].",
flush=True,
)
started = True
break
if not started:
print(
f"Failed to start the {exec_name}. returncode: {exec_proc.returncode}",
flush=True,
)
print(f"Last logs:\n{'\n'.join(total_lines[-10:])}", flush=True)
return started
def use_huggingface_cli_to_download_model(model_name, local_dir):
print(
f"use_huggingface_cli_to_download_model(): model_name: [{model_name}]; local_dir: [{local_dir}]; "
f"HF_TOKEN here: [{os.environ['HF_TOKEN'][0:5]}...{os.environ['HF_TOKEN'][-5:]}]",
flush=True,
)
if huggingface_cli := which("huggingface-cli"):
subprocess.run(
[huggingface_cli, "login", "--token", os.environ["HF_TOKEN"]], cwd=local_dir
)
print(f"Downloading: Start! [{get_current_time()}]", flush=True)
subprocess.run(
[
huggingface_cli,
"download",
"--repo-type=model",
"tnt306/finetuned-Qwen2.5-7B-Instruct-1M",
f"{model_name}.gguf",
"--local-dir",
local_dir,
],
cwd=local_dir,
)
print(f"Downloading: End! [{get_current_time()}]", flush=True)
else:
raise Exception(
f"Model {model_name} not found but huggingface-cli is not installed. Can't download."
)
###############################################################################
def get_latest_news(topic: str, number_of_news: int = 3) -> str:
"""
Given the topic, get the latest news from the internet about the topic.
Args:
topic (str): The topic to search for.
number_of_news (int): the number of pieces of news to search for. Default: 3.
Returns:
str: The latest news about the topic.
"""
print(
f"(っ◕‿◕)っ Tool - get_latest_news(): topic = {topic}; number_of_news = {number_of_news}",
flush=True,
)
google_news = GNews(language="en", country="US", period="1d", max_results=10)
result = google_news.get_news(topic)
returned_news = ""
for idx, news in enumerate(result[:number_of_news]): # type: ignore
decoded_url = gnewsdecoder(news["url"], interval=1)
if decoded_url.get("status"):
real_url = decoded_url["decoded_url"]
else:
real_url = news["url"]
returned_news += (
f"# News Article {idx}\n"
f"Title: {news['title']}\n"
f"Source: {news['publisher']['title']}\n"
f"URL: [Click here]({real_url})\n"
"\n"
)
print(
f"(っ◕‿◕)っ Tool - get_latest_news(): returned_news = \n{returned_news}\n",
flush=True,
)
return returned_news
###############################################################################
def create_vector_store_index(
folder_path: str, persist_dir: str = "vector_store_index"
):
Settings.llm = MockLLM() # No need for real LLM.
Settings.embed_model = HuggingFaceEmbedding(
model_name="BAAI/bge-small-en-v1.5", device="cpu"
)
Settings.chunk_size = 1024 # default
Settings.chunk_overlap = 20 # default
os.environ["TIKTOKEN_CACHE_DIR"] = (
f"{os.path.dirname(os.path.realpath(__file__))}/tiktoken_cache"
)
documents = SimpleDirectoryReader(folder_path).load_data()
parser = MarkdownNodeParser(include_metadata=True)
vector_store_index = VectorStoreIndex.from_documents(
documents, transformations=[parser]
)
vector_store_index.storage_context.persist(persist_dir=persist_dir)
def get_query_from_vector_store_index(
persist_dir: str = "vector_store_index", top_k: int = 2
):
Settings.llm = MockLLM()
Settings.embed_model = HuggingFaceEmbedding(
model_name="BAAI/bge-small-en-v1.5", device="cpu"
)
os.environ["TIKTOKEN_CACHE_DIR"] = (
f"{os.path.dirname(os.path.realpath(__file__))}/tiktoken_cache"
)
storage_context = StorageContext.from_defaults(persist_dir=persist_dir)
index = load_index_from_storage(storage_context)
retriever = VectorIndexRetriever(
index=index,
similarity_top_k=top_k,
)
query_engine = RetrieverQueryEngine(
retriever=retriever,
node_postprocessors=[SimilarityPostprocessor(similarity_cutoff=0.5)],
)
def get_rag_context(query: str, separator: str = "\n\n") -> str:
print(f"(っ◕‿◕)っ get_rag_context(): query = {query}", flush=True)
response = query_engine.query(query)
res = separator.join([node.text for node in response.source_nodes])
print(f"(っ◕‿◕)っ get_rag_context(): res = {res}", flush=True)
return res
return get_rag_context