import gradio as gr from huggingface_hub import InferenceClient import json import uuid from PIL import Image from bs4 import BeautifulSoup import requests import random from transformers import LlavaProcessor, LlavaForConditionalGeneration, TextIteratorStreamer from threading import Thread import re import time import torch import cv2 from gradio_client import Client, file def extract_text_from_webpage(html_content): soup = BeautifulSoup(html_content, 'html.parser') for tag in soup(["script", "style", "header", "footer"]): tag.extract() return soup.get_text(strip=True) def search(query): term = query start = 0 all_results = [] max_chars_per_page = 8000 with requests.Session() as session: resp = session.get( url="https://www.google.com/search", headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"}, params={"q": term, "num": 3, "udm": 14}, timeout=5, verify=None, ) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") result_block = soup.find_all("div", attrs={"class": "g"}) for result in result_block: link = result.find("a", href=True) link = link["href"] try: webpage = session.get(link, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"}, timeout=5, verify=False) webpage.raise_for_status() visible_text = extract_text_from_webpage(webpage.text) if len(visible_text) > max_chars_per_page: visible_text = visible_text[:max_chars_per_page] all_results.append({"link": link, "text": visible_text}) except requests.exceptions.RequestException: all_results.append({"link": link, "text": None}) return all_results client_gemma = InferenceClient("mistralai/Mistral-7B-Instruct-v0.3") client_llama = InferenceClient("meta-llama/Meta-Llama-3-8B-Instruct") func_caller = [] def respond(message, history): func_caller = [] user_prompt = message functions_metadata = [ {"type": "function", "function": {"name": "web_search", "description": "Search query on google", "parameters": {"type": "object", "properties": {"query": {"type": "string", "description": "web search query"}}, "required": ["query"]}}}, ] for msg in history: func_caller.append({"role": "user", "content": f"{str(msg[0])}"}) func_caller.append({"role": "assistant", "content": f"{str(msg[1])}"}) message_text = message["text"] func_caller.append({"role": "user", "content": f'[SYSTEM]You are a helpful assistant. You have access to the following functions: \n {str(functions_metadata)}\n\nTo use these functions respond with:\n {{ "name": "function_name", "arguments": {{ "arg_1": "value_1", "arg_1": "value_1", ... }} }} [USER] {message_text}'}) response = client_gemma.chat_completion(func_caller, max_tokens=200) response = str(response) try: response = response[int(response.find("{")):int(response.rindex("}"))+1] except: response = response[int(response.find("{")):(int(response.rfind("}"))+1)] response = response.replace("\\n", "") response = response.replace("\\'", "'") response = response.replace('\\"', '"') response = response.replace('\\', '') print(f"\n{response}") try: json_data = json.loads(str(response)) if json_data["name"] == "web_search": query = json_data["arguments"]["query"] gr.Info("Searching Web") web_results = search(query) gr.Info("Extracting relevant Info") web2 = ' '.join([f"Link: {res['link']}\nText: {res['text']}\n\n" for res in web_results if res['text']]) messages = f"" for msg in history: messages += f"\nuser\n{str(msg[0])}" messages += f"\nassistant\n{str(msg[1])}" messages+=f"\nuser\n{message_text}\nweb_result\n{web2}\nassistant\n" stream = client_mixtral.text_generation(messages, max_new_tokens=2000, do_sample=True, stream=True, details=True, return_full_text=False) output = "" for response in stream: if not response.token.text == "": output += response.token.text yield output else: messages = f"" for msg in history: messages += f"\nuser\n{str(msg[0])}" messages += f"\nassistant\n{str(msg[1])}" messages+=f"\nuser\n{message_text}\nassistant\n" stream = client_llama.text_generation(messages, max_new_tokens=2000, do_sample=True, stream=True, details=True, return_full_text=False) output = "" for response in stream: if not response.token.text == "": output += response.token.text yield output except: messages = f"" for msg in history: messages += f"\nuser\n{str(msg[0])}" messages += f"\nassistant\n{str(msg[1])}" messages+=f"\nuser\n{message_text}\nassistant\n" stream = client_llama.text_generation(messages, max_new_tokens=2000, do_sample=True, stream=True, details=True, return_full_text=False) output = "" for response in stream: if not response.token.text == "": output += response.token.text yield output demo = gr.ChatInterface( fn=respond, chatbot=gr.Chatbot(show_copy_button=True, likeable=True, layout="panel"), description=" ", textbox=gr.MultimodalTextbox(), multimodal=True, concurrency_limit=200, ) demo.launch()