Spaces:
Sleeping
Sleeping
| import os | |
| from langchain_community.tools import Tool, BraveSearch, YouTubeSearchTool | |
| from langchain_community.tools import DuckDuckGoSearchResults, GoogleSearchResults | |
| from langchain_community.tools import WikipediaQueryRun | |
| from langchain_community.utilities import WikipediaAPIWrapper, WolframAlphaAPIWrapper | |
| from langchain_community.tools import WolframAlphaQueryRun | |
| from typing import Any, Dict, List, Optional | |
| import json | |
| import re | |
| from datetime import datetime, timedelta | |
| import io # for BytesIO | |
| # Structured tools | |
| try: | |
| from langchain_core.tools import tool | |
| except Exception: | |
| def tool(*args, **kwargs): | |
| def _wrap(fn): return fn | |
| return _wrap | |
| # Optional deps | |
| try: | |
| from youtube_transcript_api import ( | |
| YouTubeTranscriptApi, | |
| TranscriptsDisabled, | |
| NoTranscriptFound, | |
| ) | |
| except Exception: | |
| YouTubeTranscriptApi = None # type: ignore | |
| TranscriptsDisabled = Exception # type: ignore | |
| NoTranscriptFound = Exception # type: ignore | |
| try: | |
| from dateutil import parser as date_parser | |
| from dateutil.relativedelta import relativedelta | |
| except Exception: | |
| date_parser = None # type: ignore | |
| relativedelta = None # type: ignore | |
| try: | |
| from zoneinfo import ZoneInfo # py>=3.9 | |
| except Exception: | |
| ZoneInfo = None # type: ignore | |
| try: | |
| import pandas as pd | |
| except Exception: | |
| pd = None # type: ignore | |
| try: | |
| import requests | |
| except Exception: | |
| requests = None # type: ignore | |
| def _parse_video_id(url_or_id: str) -> Optional[str]: | |
| s = (url_or_id or "").strip() | |
| if re.fullmatch(r"[0-9A-Za-z_-]{11}", s): | |
| return s | |
| try: | |
| from urllib.parse import urlparse, parse_qs | |
| u = urlparse(s) | |
| if u.netloc.endswith(("youtube.com", "m.youtube.com", "music.youtube.com")): | |
| qs = parse_qs(u.query) | |
| v = (qs.get("v") or [""])[0] | |
| if re.fullmatch(r"[0-9A-Za-z_-]{11}", v): | |
| return v | |
| if u.netloc.endswith("youtu.be"): | |
| vid = u.path.lstrip("/").split("/")[0] | |
| if re.fullmatch(r"[0-9A-Za-z_-]{11}", vid): | |
| return vid | |
| except Exception: | |
| pass | |
| return None | |
| def _to_dt(value: str, tz: Optional[str] = None) -> datetime: | |
| if date_parser is not None: | |
| dt = date_parser.parse(value) | |
| else: | |
| try: | |
| dt = datetime.fromisoformat(value) | |
| except Exception: | |
| dt = datetime.strptime(value, "%Y-%m-%d") | |
| if tz and ZoneInfo is not None: | |
| try: | |
| z = ZoneInfo(tz) | |
| dt = dt.replace( | |
| tzinfo=z) if dt.tzinfo is None else dt.astimezone(z) | |
| except Exception: | |
| pass | |
| return dt | |
| def youtube_transcript(video: str, languages: Optional[List[str]] = None, max_chars: int = 8000) -> Dict[str, Any]: | |
| """ | |
| Get YouTube transcript for a video URL or ID. | |
| Params: | |
| - video: URL or 11-char video ID | |
| - languages: preferred languages, e.g. ["vi","en"] | |
| - max_chars: truncate long transcripts | |
| """ | |
| if YouTubeTranscriptApi is None: | |
| return {"ok": False, "error": "youtube-transcript-api not installed. pip install youtube-transcript-api"} | |
| vid = _parse_video_id(video) | |
| if not vid: | |
| return {"ok": False, "error": "Invalid YouTube video id/url."} | |
| langs = languages or ["vi", "en"] | |
| try: | |
| segs = None | |
| try: | |
| segs = YouTubeTranscriptApi.get_transcript(vid, languages=langs) | |
| except NoTranscriptFound: | |
| try: | |
| segs = YouTubeTranscriptApi.get_transcript( | |
| vid, languages=["en"]) | |
| except Exception: | |
| pass | |
| if not segs: | |
| try: | |
| tx = YouTubeTranscriptApi.list_transcripts(vid) | |
| for tr in tx: | |
| if tr.is_translatable and "en" in langs: | |
| segs = tr.translate("en").fetch() | |
| break | |
| except Exception: | |
| pass | |
| if not segs: | |
| return {"ok": False, "error": "No transcript available."} | |
| text = " ".join(s.get("text", "") for s in segs).strip() | |
| if max_chars and len(text) > max_chars: | |
| text = text[:max_chars] + " ...[truncated]..." | |
| return {"ok": True, "data": {"video_id": vid, "text": text, "segments": segs}} | |
| except TranscriptsDisabled: | |
| return {"ok": False, "error": "Transcripts are disabled for this video."} | |
| except Exception as e: | |
| return {"ok": False, "error": f"Transcript fetch failed: {e}"} | |
| def youtube_transcript_srt(video: str, languages: Optional[List[str]] = None, max_segments: Optional[int] = None) -> Dict[str, Any]: | |
| """ | |
| Return the YouTube transcript as SRT captions. | |
| Params: | |
| - video: URL or 11-char video ID | |
| - languages: preferred languages, e.g. ["vi","en"] | |
| - max_segments: limit number of caption segments (optional) | |
| """ | |
| try: | |
| # Reuse the existing transcript tool to fetch segments | |
| res = youtube_transcript(video=video, languages=languages, max_chars=0) | |
| if not res.get("ok"): | |
| return res | |
| segs = (res.get("data") or {}).get("segments") or [] | |
| if max_segments is not None and max_segments > 0: | |
| segs = segs[:max_segments] | |
| def _srt_time(sec: float) -> str: | |
| sec = max(0.0, float(sec or 0.0)) | |
| ms = int(round((sec - int(sec)) * 1000)) | |
| s = int(sec) % 60 | |
| m = (int(sec) // 60) % 60 | |
| h = int(sec) // 3600 | |
| return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}" | |
| lines: List[str] = [] | |
| for i, seg in enumerate(segs, 1): | |
| start = float(seg.get("start", 0.0)) | |
| end = start + float(seg.get("duration", 0.0)) | |
| text = str(seg.get("text", "")).strip() | |
| lines.append(str(i)) | |
| lines.append(f"{_srt_time(start)} --> {_srt_time(end)}") | |
| lines.append(text) | |
| lines.append("") # blank line between blocks | |
| srt = "\n".join(lines).strip() + ("\n" if lines else "") | |
| return {"ok": True, "data": {"srt": srt, "segments": len(segs)}} | |
| except Exception as e: | |
| return {"ok": False, "error": f"SRT generation failed: {e}"} | |
| def date_today(tz: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Return today's datetime fields. | |
| """ | |
| now = datetime.now( | |
| ZoneInfo(tz)) if tz and ZoneInfo is not None else datetime.now() | |
| return {"ok": True, "data": {"iso": now.isoformat(), "date": now.date().isoformat(), "time": now.time().isoformat(timespec="seconds")}} | |
| def date_parse(date_str: str, tz: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Parse a date/time string into ISO fields. | |
| """ | |
| try: | |
| dt = _to_dt(date_str, tz) | |
| return {"ok": True, "data": {"iso": dt.isoformat(), "date": dt.date().isoformat(), "time": dt.time().isoformat(timespec="seconds")}} | |
| except Exception as e: | |
| return {"ok": False, "error": f"Parse failed: {e}"} | |
| def date_add(date_str: str, days: int = 0, months: int = 0, years: int = 0, tz: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Add/subtract days/months/years to a date/time. | |
| """ | |
| try: | |
| dt = _to_dt(date_str, tz) | |
| if relativedelta is not None: | |
| dt2 = dt + relativedelta(days=days, months=months, years=years) | |
| else: | |
| if months or years: | |
| return {"ok": False, "error": "Month/year arithmetic needs python-dateutil. pip install python-dateutil"} | |
| dt2 = dt + timedelta(days=days) | |
| return {"ok": True, "data": {"iso": dt2.isoformat(), "date": dt2.date().isoformat(), "time": dt2.time().isoformat(timespec="seconds")}} | |
| except Exception as e: | |
| return {"ok": False, "error": f"Add failed: {e}"} | |
| def date_diff(start: str, end: str, unit: str = "days", tz: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Difference between two date/times. unit: days|hours|minutes|seconds. | |
| """ | |
| try: | |
| d1 = _to_dt(start, tz) | |
| d2 = _to_dt(end, tz) | |
| seconds = (d2 - d1).total_seconds() | |
| unit = (unit or "days").lower() | |
| if unit == "seconds": | |
| value = seconds | |
| elif unit == "minutes": | |
| value = seconds / 60 | |
| elif unit == "hours": | |
| value = seconds / 3600 | |
| else: | |
| unit = "days" | |
| value = seconds / 86400 | |
| return {"ok": True, "data": {"value": value, "unit": unit}} | |
| except Exception as e: | |
| return {"ok": False, "error": f"Diff failed: {e}"} | |
| def next_weekday(date_str: str, weekday: int, include_today: bool = False, tz: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Next date matching weekday (0=Mon..6=Sun). | |
| """ | |
| try: | |
| base = _to_dt(date_str, tz).date() | |
| wd = int(weekday) % 7 | |
| delta = (wd - base.weekday()) % 7 | |
| if delta == 0 and not include_today: | |
| delta = 7 | |
| target = base + timedelta(days=delta) | |
| return {"ok": True, "data": {"date": target.isoformat(), "weekday": wd}} | |
| except Exception as e: | |
| return {"ok": False, "error": f"next_weekday failed: {e}"} | |
| def date_format(date_str: str, fmt: str = "%Y-%m-%d %H:%M:%S", tz: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Format a date/time string with strftime. | |
| """ | |
| try: | |
| dt = _to_dt(date_str, tz) | |
| return {"ok": True, "data": {"formatted": dt.strftime(fmt)}} | |
| except Exception as e: | |
| return {"ok": False, "error": f"Format failed: {e}"} | |
| def read_excel(path_or_url: str, sheet: Optional[str] = None, nrows: int = 100, usecols: Optional[str] = None, header: Optional[int] = 0) -> Dict[str, Any]: | |
| """ | |
| Read a worksheet from an Excel file (.xlsx/.xls/.xlsm) from a local path or HTTP(S) URL. | |
| Params: | |
| - path_or_url: local file path or URL. | |
| - sheet: sheet name or 0-based index (default: first sheet). | |
| - nrows: max number of rows to return (default: 100). | |
| - usecols: Excel-style column selection, e.g., 'A:D' or 'A,C:E'. | |
| - header: row index to use as header (default: 0). Use None for no header. | |
| """ | |
| if pd is None: | |
| return {"ok": False, "error": "pandas not installed. pip install pandas openpyxl"} | |
| src = (path_or_url or "").strip() | |
| if not src: | |
| return {"ok": False, "error": "Missing path_or_url"} | |
| try: | |
| data_src: Any | |
| if re.match(r"^https?://", src, re.I): | |
| if requests is None: | |
| return {"ok": False, "error": "requests not installed for URL fetching. pip install requests"} | |
| resp = requests.get(src, timeout=30) | |
| resp.raise_for_status() | |
| data_src = io.BytesIO(resp.content) | |
| else: | |
| if not os.path.exists(src): | |
| return {"ok": False, "error": f"File not found: {src}"} | |
| data_src = src | |
| sheet_name = 0 if sheet is None else sheet | |
| df = pd.read_excel( | |
| data_src, | |
| sheet_name=sheet_name, | |
| nrows=None if (nrows is None or nrows <= 0) else nrows, | |
| usecols=usecols, | |
| header=header | |
| ) | |
| if isinstance(df, dict): # safety if engine returns multiple sheets | |
| first_key = next(iter(df.keys())) | |
| df = df[first_key] | |
| sheet_used = first_key | |
| else: | |
| sheet_used = sheet_name | |
| if nrows and nrows > 0: | |
| df = df.head(nrows) | |
| columns = [str(c) for c in df.columns.tolist()] | |
| records = df.to_dict(orient="records") | |
| return { | |
| "ok": True, | |
| "data": { | |
| "sheet": sheet_used, | |
| "columns": columns, | |
| "records": records, | |
| "info": {"rows": len(records), "cols": len(columns)} | |
| } | |
| } | |
| except Exception as e: | |
| return {"ok": False, "error": "Excel read failed: {}".format(e)} | |
| def read_text(path_or_url: str, max_chars: int = 20000, encoding: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Read a text file from a local path or HTTP(S) URL. | |
| Params: | |
| - path_or_url: local file path or URL. | |
| - max_chars: maximum characters to return (default: 20000). | |
| - encoding: optional text encoding override; if omitted, try to detect. | |
| """ | |
| src = (path_or_url or "").strip() | |
| if not src: | |
| return {"ok": False, "error": "Missing path_or_url"} | |
| try: | |
| text: str = "" | |
| used_encoding: str = "utf-8" | |
| if re.match(r"^https?://", src, re.I): | |
| if requests is None: | |
| return {"ok": False, "error": "requests not installed for URL fetching. pip install requests"} | |
| resp = requests.get(src, timeout=30) | |
| resp.raise_for_status() | |
| used_encoding = encoding or resp.encoding or getattr( | |
| resp, "apparent_encoding", None) or "utf-8" | |
| text = resp.content.decode(used_encoding, errors="replace") | |
| else: | |
| if not os.path.exists(src): | |
| return {"ok": False, "error": f"File not found: {src}"} | |
| enc_candidates = [encoding] if encoding else [ | |
| "utf-8", "utf-16", "utf-16-le", "utf-16-be", "latin-1"] | |
| for enc_try in enc_candidates: | |
| try: | |
| with open(src, "r", encoding=enc_try, errors="strict") as f: | |
| text = f.read() | |
| used_encoding = enc_try or "utf-8" | |
| break | |
| except Exception: | |
| continue | |
| else: | |
| with open(src, "rb") as f: | |
| raw = f.read() | |
| used_encoding = "latin-1" | |
| text = raw.decode(used_encoding, errors="replace") | |
| truncated = False | |
| if max_chars and max_chars > 0 and len(text) > max_chars: | |
| text = text[:max_chars] + " ...[truncated]..." | |
| truncated = True | |
| return { | |
| "ok": True, | |
| "data": { | |
| "path": src, | |
| "encoding": used_encoding, | |
| "truncated": truncated, | |
| "length": len(text), | |
| "text": text, | |
| }, | |
| } | |
| except Exception as e: | |
| return {"ok": False, "error": f"Text read failed: {e}"} | |
| def get_tools(): | |
| """ | |
| Returns a list of tools that can be used by the agent. | |
| """ | |
| wikipedia_api_wrapper = WikipediaAPIWrapper() | |
| tools = [ | |
| Tool( | |
| name="YouTubeSearch", | |
| func=YouTubeSearchTool().run, | |
| description="Search YouTube for videos." | |
| ), | |
| Tool( | |
| name="DuckDuckGoSearch", | |
| func=DuckDuckGoSearchResults().run, | |
| description="Search the web using DuckDuckGo." | |
| ), | |
| # Tool( | |
| # name="GoogleSearch", | |
| # func=GoogleSearchResults().run, | |
| # description="Search the web using Google." | |
| # ), | |
| Tool( | |
| name="WikipediaQuery", | |
| func=WikipediaQueryRun(api_wrapper=wikipedia_api_wrapper).run, | |
| description="Query Wikipedia for information." | |
| ), | |
| # Tool( | |
| # name="WolframAlphaQuery", | |
| # func=WolframAlphaQueryRun().run, | |
| # description="Query Wolfram Alpha for computational knowledge." | |
| # ) | |
| ] | |
| # Add structured tools (LangChain @tool) | |
| tools.extend([ | |
| youtube_transcript, | |
| date_today, | |
| date_parse, | |
| date_add, | |
| date_diff, | |
| next_weekday, | |
| date_format, | |
| read_text, | |
| read_excel, | |
| youtube_transcript_srt, # new | |
| ]) | |
| return tools | |