Spaces:
Sleeping
Sleeping
| from datetime import datetime, timezone | |
| import socket | |
| import shlex | |
| import subprocess | |
| import os | |
| from shutil import which | |
| from llama_index.core import ( | |
| Settings, | |
| SimpleDirectoryReader, | |
| VectorStoreIndex, | |
| load_index_from_storage, | |
| StorageContext, | |
| ) | |
| from llama_index.core.retrievers import VectorIndexRetriever | |
| from llama_index.core.query_engine import RetrieverQueryEngine | |
| from llama_index.core.postprocessor import SimilarityPostprocessor | |
| from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
| from llama_index.core.llms.mock import MockLLM | |
| from llama_index.core.node_parser import MarkdownNodeParser | |
| from gnews import GNews | |
| from googlenewsdecoder import gnewsdecoder | |
| def get_current_time(): | |
| return datetime.now(timezone.utc).strftime("%Y-%m-%d_%H-%M-%S") | |
| def check_server(address, port): | |
| # Create a TCP socket | |
| s = socket.socket() | |
| print(f"Attempting to connect to {address} on port {port}", flush=True) | |
| try: | |
| s.connect((address, port)) | |
| print(f"Connected to {address} on port {port}", flush=True) | |
| return True | |
| except socket.error as e: | |
| print(f"Connection to {address} on port {port} failed:{e}", flush=True) | |
| return False | |
| finally: | |
| s.close() | |
| def get_valid_filename(filename: str): | |
| invalid_chars = frozenset('<>:"/\\|?*') | |
| return "".join("-" if c in invalid_chars else c for c in filename) | |
| ############################################################################### | |
| # Example usage: restart_exec(llama_server_path, "server is listening on") | |
| def restart_exec(shell_exec_path_with_args, success_text): | |
| print( | |
| f"restart_exec(): shell_exec_path_with_args: [{shell_exec_path_with_args}]", | |
| flush=True, | |
| ) | |
| exec_path = shlex.split(shell_exec_path_with_args)[ | |
| 0 | |
| ] # supposed to have absolute path! | |
| exec_name = os.path.basename(exec_path) | |
| exec_folder = os.path.dirname(exec_path) | |
| # Start a new llama-server process. | |
| exec_proc = subprocess.Popen( | |
| shell_exec_path_with_args, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| shell=True, | |
| cwd=f"{exec_folder}", | |
| ) | |
| os.set_blocking(exec_proc.stdout.fileno(), False) # type: ignore | |
| started = False | |
| total_lines = [] | |
| while exec_proc.poll() is None: | |
| output = exec_proc.stdout.readline() # type: ignore | |
| error = exec_proc.stderr.readline() # type: ignore | |
| current_line = ( | |
| curr_line_output | |
| if output and len(curr_line_output := output.strip()) > 0 | |
| else "" | |
| ) + ( | |
| curr_line_error | |
| if error and len(curr_line_error := error.strip()) > 0 | |
| else "" | |
| ) | |
| if len(current_line) > 0: | |
| print(f"restart_exec(): Current CMD Line: {current_line}") | |
| total_lines.append(current_line) | |
| if success_text in current_line: | |
| print( | |
| f"(γ£ββΏβ)γ£ β Found the success text [{success_text}] at [{get_current_time()}].", | |
| flush=True, | |
| ) | |
| started = True | |
| break | |
| if not started: | |
| print( | |
| f"Failed to start the {exec_name}. returncode: {exec_proc.returncode}", | |
| flush=True, | |
| ) | |
| print(f"Last logs:\n{'\n'.join(total_lines[-10:])}", flush=True) | |
| return started | |
| def use_huggingface_cli_to_download_model(model_name, local_dir): | |
| print( | |
| f"use_huggingface_cli_to_download_model(): model_name: [{model_name}]; local_dir: [{local_dir}]; " | |
| f"HF_TOKEN here: [{os.environ['HF_TOKEN'][0:5]}...{os.environ['HF_TOKEN'][-5:]}]", | |
| flush=True, | |
| ) | |
| if huggingface_cli := which("huggingface-cli"): | |
| subprocess.run( | |
| [huggingface_cli, "login", "--token", os.environ["HF_TOKEN"]], cwd=local_dir | |
| ) | |
| print(f"Downloading: Start! [{get_current_time()}]", flush=True) | |
| subprocess.run( | |
| [ | |
| huggingface_cli, | |
| "download", | |
| "--repo-type=model", | |
| "tnt306/finetuned-Qwen2.5-7B-Instruct-1M", | |
| f"{model_name}.gguf", | |
| "--local-dir", | |
| local_dir, | |
| ], | |
| cwd=local_dir, | |
| ) | |
| print(f"Downloading: End! [{get_current_time()}]", flush=True) | |
| else: | |
| raise Exception( | |
| f"Model {model_name} not found but huggingface-cli is not installed. Can't download." | |
| ) | |
| ############################################################################### | |
| def get_latest_news(topic: str, number_of_news: int = 3) -> str: | |
| """ | |
| Given the topic, get the latest news from the internet about the topic. | |
| Args: | |
| topic (str): The topic to search for. | |
| number_of_news (int): the number of pieces of news to search for. Default: 3. | |
| Returns: | |
| str: The latest news about the topic. | |
| """ | |
| print( | |
| f"(γ£ββΏβ)γ£ Tool - get_latest_news(): topic = {topic}; number_of_news = {number_of_news}", | |
| flush=True, | |
| ) | |
| google_news = GNews(language="en", country="US", period="1d", max_results=10) | |
| result = google_news.get_news(topic) | |
| returned_news = "" | |
| for idx, news in enumerate(result[:number_of_news]): # type: ignore | |
| decoded_url = gnewsdecoder(news["url"], interval=1) | |
| if decoded_url.get("status"): | |
| real_url = decoded_url["decoded_url"] | |
| else: | |
| real_url = news["url"] | |
| returned_news += ( | |
| f"# News Article {idx}\n" | |
| f"Title: {news['title']}\n" | |
| f"Source: {news['publisher']['title']}\n" | |
| f"URL: [Click here]({real_url})\n" | |
| "\n" | |
| ) | |
| print( | |
| f"(γ£ββΏβ)γ£ Tool - get_latest_news(): returned_news = \n{returned_news}\n", | |
| flush=True, | |
| ) | |
| return returned_news | |
| ############################################################################### | |
| def create_vector_store_index( | |
| folder_path: str, persist_dir: str = "vector_store_index" | |
| ): | |
| Settings.llm = MockLLM() # No need for real LLM. | |
| Settings.embed_model = HuggingFaceEmbedding( | |
| model_name="BAAI/bge-small-en-v1.5", device="cpu" | |
| ) | |
| Settings.chunk_size = 1024 # default | |
| Settings.chunk_overlap = 20 # default | |
| os.environ["TIKTOKEN_CACHE_DIR"] = ( | |
| f"{os.path.dirname(os.path.realpath(__file__))}/tiktoken_cache" | |
| ) | |
| documents = SimpleDirectoryReader(folder_path).load_data() | |
| parser = MarkdownNodeParser(include_metadata=True) | |
| vector_store_index = VectorStoreIndex.from_documents( | |
| documents, transformations=[parser] | |
| ) | |
| vector_store_index.storage_context.persist(persist_dir=persist_dir) | |
| def get_query_from_vector_store_index( | |
| persist_dir: str = "vector_store_index", top_k: int = 2 | |
| ): | |
| Settings.llm = MockLLM() | |
| Settings.embed_model = HuggingFaceEmbedding( | |
| model_name="BAAI/bge-small-en-v1.5", device="cpu" | |
| ) | |
| os.environ["TIKTOKEN_CACHE_DIR"] = ( | |
| f"{os.path.dirname(os.path.realpath(__file__))}/tiktoken_cache" | |
| ) | |
| storage_context = StorageContext.from_defaults(persist_dir=persist_dir) | |
| index = load_index_from_storage(storage_context) | |
| retriever = VectorIndexRetriever( | |
| index=index, | |
| similarity_top_k=top_k, | |
| ) | |
| query_engine = RetrieverQueryEngine( | |
| retriever=retriever, | |
| node_postprocessors=[SimilarityPostprocessor(similarity_cutoff=0.5)], | |
| ) | |
| def get_rag_context(query: str, separator: str = "\n\n") -> str: | |
| print(f"(γ£ββΏβ)γ£ get_rag_context(): query = {query}", flush=True) | |
| response = query_engine.query(query) | |
| res = separator.join([node.text for node in response.source_nodes]) | |
| print(f"(γ£ββΏβ)γ£ get_rag_context(): res = {res}", flush=True) | |
| return res | |
| return get_rag_context | |