import asyncio import os # Tools from contextlib import contextmanager from typing import Optional import pandas as pd from langchain.agents import tool from langchain.agents.agent_toolkits.pandas.base import create_pandas_dataframe_agent from langchain.chains.qa_with_sources.loading import load_qa_with_sources_chain from langchain.docstore.document import Document ROOT_DIR = "./data/" from langchain.chains.qa_with_sources.loading import BaseCombineDocumentsChain from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.tools import BaseTool from langchain.tools.file_management.read import ReadFileTool from langchain.tools.file_management.write import WriteFileTool from pydantic import Field from swarms.utils.logger import logger @contextmanager def pushd(new_dir): """Context manager for changing the current working directory.""" prev_dir = os.getcwd() os.chdir(new_dir) try: yield finally: os.chdir(prev_dir) @tool def process_csv( llm, csv_file_path: str, instructions: str, output_path: Optional[str] = None ) -> str: """Process a CSV by with pandas in a limited REPL.\ Only use this after writing data to disk as a csv file.\ Any figures must be saved to disk to be viewed by the human.\ Instructions should be written in natural language, not code. Assume the dataframe is already loaded.""" with pushd(ROOT_DIR): try: df = pd.read_csv(csv_file_path) except Exception as e: return f"Error: {e}" agent = create_pandas_dataframe_agent(llm, df, max_iterations=30, verbose=False) if output_path is not None: instructions += f" Save output to disk at {output_path}" try: result = agent.run(instructions) return result except Exception as e: return f"Error: {e}" async def async_load_playwright(url: str) -> str: """Load the specified URLs using Playwright and parse using BeautifulSoup.""" from bs4 import BeautifulSoup from playwright.async_api import async_playwright results = "" async with async_playwright() as p: browser = await p.chromium.launch(headless=True) try: page = await browser.new_page() await page.goto(url) page_source = await page.content() soup = BeautifulSoup(page_source, "html.parser") for script in soup(["script", "style"]): script.extract() text = soup.get_text() lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) results = "\n".join(chunk for chunk in chunks if chunk) except Exception as e: results = f"Error: {e}" await browser.close() return results def run_async(coro): event_loop = asyncio.get_event_loop() return event_loop.run_until_complete(coro) @tool def browse_web_page(url: str) -> str: """Verbose way to scrape a whole webpage. Likely to cause issues parsing.""" return run_async(async_load_playwright(url)) def _get_text_splitter(): return RecursiveCharacterTextSplitter( # Set a really small chunk size, just to show. chunk_size = 500, chunk_overlap = 20, length_function = len, ) class WebpageQATool(BaseTool): name = "query_webpage" description = "Browse a webpage and retrieve the information relevant to the question." text_splitter: RecursiveCharacterTextSplitter = Field(default_factory=_get_text_splitter) qa_chain: BaseCombineDocumentsChain def _run(self, url: str, question: str) -> str: """Useful for browsing websites and scraping the text information.""" result = browse_web_page.run(url) docs = [Document(page_content=result, metadata={"source": url})] web_docs = self.text_splitter.split_documents(docs) results = [] # TODO: Handle this with a MapReduceChain for i in range(0, len(web_docs), 4): input_docs = web_docs[i:i+4] window_result = self.qa_chain({"input_documents": input_docs, "question": question}, return_only_outputs=True) results.append(f"Response from window {i} - {window_result}") results_docs = [Document(page_content="\n".join(results), metadata={"source": url})] return self.qa_chain({"input_documents": results_docs, "question": question}, return_only_outputs=True) async def _arun(self, url: str, question: str) -> str: raise NotImplementedError import interpreter @tool def compile(task: str): """ Open Interpreter lets LLMs run code (Python, Javascript, Shell, and more) locally. You can chat with Open Interpreter through a ChatGPT-like interface in your terminal by running $ interpreter after installing. This provides a natural-language interface to your computer's general-purpose capabilities: Create and edit photos, videos, PDFs, etc. Control a Chrome browser to perform research Plot, clean, and analyze large datasets ...etc. ⚠️ Note: You'll be asked to approve code before it's run. """ task = interpreter.chat(task, return_messages=True) interpreter.chat() interpreter.reset(task) os.environ["INTERPRETER_CLI_AUTO_RUN"] = True os.environ["INTERPRETER_CLI_FAST_MODE"] = True os.environ["INTERPRETER_CLI_DEBUG"] = True # mm model workers import torch from PIL import Image from transformers import ( BlipForQuestionAnswering, BlipProcessor, ) @tool def VQAinference(self, inputs): """ Answer Question About The Image, VQA Multi-Modal Worker agent description="useful when you need an answer for a question based on an image. " "like: what is the background color of the last image, how many cats in this figure, what is in this figure. " "The input to this tool should be a comma separated string of two, representing the image_path and the question", """ device = "cuda:0" torch_dtype = torch.float16 if "cuda" in device else torch.float32 processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base") model = BlipForQuestionAnswering.from_pretrained( "Salesforce/blip-vqa-base", torch_dtype=torch_dtype ).to(device) image_path, question = inputs.split(",") raw_image = Image.open(image_path).convert("RGB") inputs = processor(raw_image, question, return_tensors="pt").to( device, torch_dtype ) out = model.generate(**inputs) answer = processor.decode(out[0], skip_special_tokens=True) logger.debug( f"\nProcessed VisualQuestionAnswering, Input Image: {image_path}, Input Question: {question}, " f"Output Answer: {answer}" ) return answer