File size: 3,591 Bytes
105b369
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import json
from os import getenv
from typing import Optional

from phi.tools import Toolkit
from phi.utils.log import logger

try:
    import serpapi
except ImportError:
    raise ImportError("`google-search-results` not installed.")


class SerpApiToolkit(Toolkit):
    def __init__(
        self,
        api_key: Optional[str] = getenv("SERPAPI_KEY"),
        search_youtube: bool = False,
    ):
        super().__init__(name="serpapi_tools")

        self.api_key = api_key
        if not self.api_key:
            logger.warning("No Serpapi API key provided")

        self.register(self.search_google)
        if search_youtube:
            self.register(self.search_youtube)

    def search_google(self, query: str) -> str:
        """
        Search Google using the Serpapi API. Returns the search results.

        Args:
            query(str): The query to search for.

        Returns:
            str: The search results from Google.
                Keys:
                    - 'search_results': List of organic search results.
                    - 'recipes_results': List of recipes search results.
                    - 'shopping_results': List of shopping search results.
                    - 'knowledge_graph': The knowledge graph.
                    - 'related_questions': List of related questions.
        """

        try:
            if not self.api_key:
                return "Please provide an API key"
            if not query:
                return "Please provide a query to search for"

            logger.info(f"Searching Google for: {query}")

            params = {"q": query, "api_key": self.api_key}

            search = serpapi.GoogleSearch(params)
            results = search.get_dict()

            filtered_results = {
                "search_results": results.get("organic_results", ""),
                "recipes_results": results.get("recipes_results", ""),
                "shopping_results": results.get("shopping_results", ""),
                "knowledge_graph": results.get("knowledge_graph", ""),
                "related_questions": results.get("related_questions", ""),
            }

            return json.dumps(filtered_results)

        except Exception as e:
            return f"Error searching for the query {query}: {e}"

    def search_youtube(self, query: str) -> str:
        """
        Search Youtube using the Serpapi API. Returns the search results.

        Args:
            query(str): The query to search for.

        Returns:
            str: The video search results from Youtube.
                Keys:
                    - 'video_results': List of video results.
                    - 'movie_results': List of movie results.
                    - 'channel_results': List of channel results.
        """

        try:
            if not self.api_key:
                return "Please provide an API key"
            if not query:
                return "Please provide a query to search for"

            logger.info(f"Searching Youtube for: {query}")

            params = {"search_query": query, "api_key": self.api_key}

            search = serpapi.YoutubeSearch(params)
            results = search.get_dict()

            filtered_results = {
                "video_results": results.get("video_results", ""),
                "movie_results": results.get("movie_results", ""),
                "channel_results": results.get("channel_results", ""),
            }

            return json.dumps(filtered_results)

        except Exception as e:
            return f"Error searching for the query {query}: {e}"