|
|
from typing import Dict |
|
|
from transformers import pipeline |
|
|
from smolagents.tools import Tool |
|
|
import torchcodec |
|
|
|
|
|
|
|
|
class VisitWikiPageTool(Tool): |
|
|
name = "visit_wikipage" |
|
|
description = ( |
|
|
"Visits a Wikipedia page at the given url and reads its content as a markdown string. Use this to browse Wikipedia wepages and get their full content." |
|
|
) |
|
|
inputs = { |
|
|
"url": { |
|
|
"type": "string", |
|
|
"description": "The url of the webpage to visit.", |
|
|
}, |
|
|
"max_length": { |
|
|
"type": "integer", |
|
|
"description": "Maximum number of characters to include in the response. Default 40000.", |
|
|
"nullable": True |
|
|
} |
|
|
} |
|
|
output_type = "string" |
|
|
|
|
|
def __init__(self, user_agent: str): |
|
|
super().__init__() |
|
|
self.headers = {"User-Agent": user_agent} |
|
|
|
|
|
def _truncate_content(self, content: str, max_length: int) -> str: |
|
|
if len(content) <= max_length: |
|
|
return content |
|
|
return ( |
|
|
content[:max_length] + f"\n..._This content has been truncated to stay below {max_length} characters_...\n" |
|
|
) |
|
|
|
|
|
def forward(self, url: str, max_length: int = 40000) -> str: |
|
|
try: |
|
|
import re |
|
|
import requests |
|
|
from markdownify import markdownify |
|
|
from requests.exceptions import RequestException |
|
|
except ImportError as e: |
|
|
raise ImportError( |
|
|
"You must install packages `markdownify` and `requests` to run this tool: for instance run `pip install markdownify requests`." |
|
|
) from e |
|
|
try: |
|
|
|
|
|
response = requests.get(url, timeout=20, headers=self.headers) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
markdown_content = markdownify(response.text).strip() |
|
|
max_length = max_length if max_length is not None else 40000 |
|
|
|
|
|
markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content) |
|
|
return self._truncate_content(markdown_content, max_length) |
|
|
|
|
|
except requests.exceptions.Timeout: |
|
|
return "The request timed out. Please try again later or check the URL." |
|
|
except RequestException as e: |
|
|
return f"Error fetching the webpage: {str(e)}" |
|
|
except Exception as e: |
|
|
return f"An unexpected error occurred: {str(e)}" |
|
|
|
|
|
class SpeechToTextTool(Tool): |
|
|
name = "transcriber" |
|
|
description = "This is a tool that transcribes an audio into text. It returns the transcribed text." |
|
|
inputs = { |
|
|
"audio": { |
|
|
"type": "audio", |
|
|
"description": "The audio to transcribe it should be bytes.", |
|
|
}, |
|
|
"sample_rate": { |
|
|
"type": "integer", |
|
|
"description": "The sampling rate to use to decode the audio, defaults to 16000", |
|
|
"nullable": True |
|
|
} |
|
|
} |
|
|
output_type = "string" |
|
|
def __init__(self, model: str = "openai/whisper-small"): |
|
|
super().__init__() |
|
|
self.pipe = pipeline("automatic-speech-recognition", model=model) |
|
|
|
|
|
def forward(self, audio: bytes, sample_rate: int=16000) -> str: |
|
|
sample_rate = sample_rate if sample_rate is not None else 16000 |
|
|
decoder = torchcodec.decoders.AudioDecoder(audio, sample_rate=sample_rate) |
|
|
out = self.pipe(decoder) |
|
|
return out["text"] |
|
|
|
|
|
class SpeechToTextTool(Tool): |
|
|
name = "transcriber" |
|
|
description = "This is a tool that transcribes an audio into text. It returns the transcribed text." |
|
|
inputs = { |
|
|
"audio_file": { |
|
|
"type": "string", |
|
|
"description": "The path to the audio file to transcribe.", |
|
|
}, |
|
|
"sample_rate": { |
|
|
"type": "integer", |
|
|
"description": "The sampling rate to use to decode the audio, defaults to 16000", |
|
|
"nullable": True |
|
|
} |
|
|
} |
|
|
output_type = "string" |
|
|
def __init__(self, model: str = "openai/whisper-small"): |
|
|
super().__init__() |
|
|
self.pipe = pipeline("automatic-speech-recognition", model=model) |
|
|
|
|
|
def forward(self, audio_file: str, sample_rate: int=16000) -> str: |
|
|
sample_rate = sample_rate if sample_rate is not None else 16000 |
|
|
with open(audio_file, "rb") as f: |
|
|
decoder = torchcodec.decoders.AudioDecoder(f, sample_rate=sample_rate) |
|
|
out = self.pipe(decoder) |
|
|
return out["text"] |
|
|
|