Spaces:
Runtime error
Runtime error
import asyncio | |
import os | |
# Tools | |
from contextlib import contextmanager | |
from typing import Optional | |
import pandas as pd | |
from langchain.agents import tool | |
from langchain.agents.agent_toolkits.pandas.base import create_pandas_dataframe_agent | |
from langchain.chains.qa_with_sources.loading import load_qa_with_sources_chain | |
from langchain.docstore.document import Document | |
ROOT_DIR = "./data/" | |
from langchain.chains.qa_with_sources.loading import BaseCombineDocumentsChain | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.tools import BaseTool | |
from langchain.tools.file_management.read import ReadFileTool | |
from langchain.tools.file_management.write import WriteFileTool | |
from pydantic import Field | |
from swarms.utils.logger import logger | |
def pushd(new_dir): | |
"""Context manager for changing the current working directory.""" | |
prev_dir = os.getcwd() | |
os.chdir(new_dir) | |
try: | |
yield | |
finally: | |
os.chdir(prev_dir) | |
def process_csv( | |
llm, csv_file_path: str, instructions: str, output_path: Optional[str] = None | |
) -> str: | |
"""Process a CSV by with pandas in a limited REPL.\ | |
Only use this after writing data to disk as a csv file.\ | |
Any figures must be saved to disk to be viewed by the human.\ | |
Instructions should be written in natural language, not code. Assume the dataframe is already loaded.""" | |
with pushd(ROOT_DIR): | |
try: | |
df = pd.read_csv(csv_file_path) | |
except Exception as e: | |
return f"Error: {e}" | |
agent = create_pandas_dataframe_agent(llm, df, max_iterations=30, verbose=False) | |
if output_path is not None: | |
instructions += f" Save output to disk at {output_path}" | |
try: | |
result = agent.run(instructions) | |
return result | |
except Exception as e: | |
return f"Error: {e}" | |
async def async_load_playwright(url: str) -> str: | |
"""Load the specified URLs using Playwright and parse using BeautifulSoup.""" | |
from bs4 import BeautifulSoup | |
from playwright.async_api import async_playwright | |
results = "" | |
async with async_playwright() as p: | |
browser = await p.chromium.launch(headless=True) | |
try: | |
page = await browser.new_page() | |
await page.goto(url) | |
page_source = await page.content() | |
soup = BeautifulSoup(page_source, "html.parser") | |
for script in soup(["script", "style"]): | |
script.extract() | |
text = soup.get_text() | |
lines = (line.strip() for line in text.splitlines()) | |
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
results = "\n".join(chunk for chunk in chunks if chunk) | |
except Exception as e: | |
results = f"Error: {e}" | |
await browser.close() | |
return results | |
def run_async(coro): | |
event_loop = asyncio.get_event_loop() | |
return event_loop.run_until_complete(coro) | |
def browse_web_page(url: str) -> str: | |
"""Verbose way to scrape a whole webpage. Likely to cause issues parsing.""" | |
return run_async(async_load_playwright(url)) | |
def _get_text_splitter(): | |
return RecursiveCharacterTextSplitter( | |
# Set a really small chunk size, just to show. | |
chunk_size = 500, | |
chunk_overlap = 20, | |
length_function = len, | |
) | |
class WebpageQATool(BaseTool): | |
name = "query_webpage" | |
description = "Browse a webpage and retrieve the information relevant to the question." | |
text_splitter: RecursiveCharacterTextSplitter = Field(default_factory=_get_text_splitter) | |
qa_chain: BaseCombineDocumentsChain | |
def _run(self, url: str, question: str) -> str: | |
"""Useful for browsing websites and scraping the text information.""" | |
result = browse_web_page.run(url) | |
docs = [Document(page_content=result, metadata={"source": url})] | |
web_docs = self.text_splitter.split_documents(docs) | |
results = [] | |
# TODO: Handle this with a MapReduceChain | |
for i in range(0, len(web_docs), 4): | |
input_docs = web_docs[i:i+4] | |
window_result = self.qa_chain({"input_documents": input_docs, "question": question}, return_only_outputs=True) | |
results.append(f"Response from window {i} - {window_result}") | |
results_docs = [Document(page_content="\n".join(results), metadata={"source": url})] | |
return self.qa_chain({"input_documents": results_docs, "question": question}, return_only_outputs=True) | |
async def _arun(self, url: str, question: str) -> str: | |
raise NotImplementedError | |
import interpreter | |
def compile(task: str): | |
""" | |
Open Interpreter lets LLMs run code (Python, Javascript, Shell, and more) locally. | |
You can chat with Open Interpreter through a ChatGPT-like interface in your terminal | |
by running $ interpreter after installing. | |
This provides a natural-language interface to your computer's general-purpose capabilities: | |
Create and edit photos, videos, PDFs, etc. | |
Control a Chrome browser to perform research | |
Plot, clean, and analyze large datasets | |
...etc. | |
⚠️ Note: You'll be asked to approve code before it's run. | |
""" | |
task = interpreter.chat(task, return_messages=True) | |
interpreter.chat() | |
interpreter.reset(task) | |
os.environ["INTERPRETER_CLI_AUTO_RUN"] = True | |
os.environ["INTERPRETER_CLI_FAST_MODE"] = True | |
os.environ["INTERPRETER_CLI_DEBUG"] = True | |
# mm model workers | |
import torch | |
from PIL import Image | |
from transformers import ( | |
BlipForQuestionAnswering, | |
BlipProcessor, | |
) | |
def VQAinference(self, inputs): | |
""" | |
Answer Question About The Image, VQA Multi-Modal Worker agent | |
description="useful when you need an answer for a question based on an image. " | |
"like: what is the background color of the last image, how many cats in this figure, what is in this figure. " | |
"The input to this tool should be a comma separated string of two, representing the image_path and the question", | |
""" | |
device = "cuda:0" | |
torch_dtype = torch.float16 if "cuda" in device else torch.float32 | |
processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base") | |
model = BlipForQuestionAnswering.from_pretrained( | |
"Salesforce/blip-vqa-base", torch_dtype=torch_dtype | |
).to(device) | |
image_path, question = inputs.split(",") | |
raw_image = Image.open(image_path).convert("RGB") | |
inputs = processor(raw_image, question, return_tensors="pt").to( | |
device, torch_dtype | |
) | |
out = model.generate(**inputs) | |
answer = processor.decode(out[0], skip_special_tokens=True) | |
logger.debug( | |
f"\nProcessed VisualQuestionAnswering, Input Image: {image_path}, Input Question: {question}, " | |
f"Output Answer: {answer}" | |
) | |
return answer | |