File size: 7,009 Bytes
d8d14f1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import concurrent.futures
import re
from collections import Counter
from typing import Any, Callable, List, Optional

from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.utils.file_processing import create_file
from swarms.utils.loguru_logger import initialize_logger

logger = initialize_logger(log_folder="majority_voting")


def extract_last_python_code_block(text):
    """
    Extracts the last Python code block from the given text.

    Args:
        text (str): The text to search for Python code blocks.

    Returns:
        str or None: The last Python code block found in the text, or None if no code block is found.
    """
    # The regular expression pattern for Python code blocks
    pattern = r"```[pP]ython(.*?)```"

    # Find all matches in the text
    matches = re.findall(pattern, text, re.DOTALL)

    # If there are matches, return the last one
    if matches:
        return matches[-1].strip()
    else:
        return None


def parse_code_completion(agent_response, question):
    """
    Parses the code completion response from the agent and extracts the last Python code block.

    Args:
        agent_response (str): The response from the agent.
        question (str): The original question.

    Returns:
        tuple: A tuple containing the parsed Python code and a boolean indicating success.
    """
    python_code = extract_last_python_code_block(agent_response)
    if python_code is None:
        if agent_response.count("impl]") == 0:
            python_code = agent_response
        else:
            python_code_lines = agent_response.split("\n")
            python_code = ""
            in_func = False
            for line in python_code_lines:
                if in_func:
                    python_code += line + "\n"
                if "impl]" in line:
                    in_func = True
    if python_code.count("def") == 0:
        python_code = question + python_code
    return python_code, True


def most_frequent(
    clist: list,
    cmp_func: callable = None,
):
    """
    Finds the most frequent element in a list based on a comparison function.

    Args:
        clist (list): The list of elements to search.
        cmp_func (function, optional): The comparison function used to determine the frequency of elements.
            If not provided, the default comparison function is used.

    Returns:
        tuple: A tuple containing the most frequent element and its frequency.
    """
    counter = 0
    num = clist[0]

    for i in clist:
        current_frequency = sum(cmp_func(i, item) for item in clist)
        if current_frequency > counter:
            counter = current_frequency
            num = i

    return num, counter


def majority_voting(answers: List[str]):
    """
    Performs majority voting on a list of answers and returns the most common answer.

    Args:
        answers (list): A list of answers.

    Returns:
        The most common answer in the list.
    """
    counter = Counter(answers)
    if counter:
        answer = counter.most_common(1)[0][0]
    else:
        answer = "I don't know"

    return answer


class MajorityVoting:
    """
    Class representing a majority voting system for agents.

    Args:
        agents (list): A list of agents to be used in the majority voting system.
        output_parser (function, optional): A function used to parse the output of the agents.
            If not provided, the default majority voting function is used.
        autosave (bool, optional): A boolean indicating whether to autosave the conversation to a file.
        verbose (bool, optional): A boolean indicating whether to enable verbose logging.
    Examples:
        >>> from swarms.structs.agent import Agent
        >>> from swarms.structs.majority_voting import MajorityVoting
        >>> agents = [
        ...     Agent("GPT-3"),
        ...     Agent("Codex"),
        ...     Agent("Tabnine"),
        ... ]
        >>> majority_voting = MajorityVoting(agents)
        >>> majority_voting.run("What is the capital of France?")
        'Paris'

    """

    def __init__(
        self,
        name: str = "MajorityVoting",
        description: str = "A majority voting system for agents",
        agents: List[Agent] = [],
        output_parser: Optional[Callable] = majority_voting,
        autosave: bool = False,
        verbose: bool = False,
        *args,
        **kwargs,
    ):
        self.agents = agents
        self.output_parser = output_parser
        self.autosave = autosave
        self.verbose = verbose

        self.conversation = Conversation(
            time_enabled=True, *args, **kwargs
        )

        # If autosave is enabled, save the conversation to a file
        if self.autosave:
            create_file(
                str(self.conversation), "majority_voting.json"
            )

        # Log the agents
        logger.info("Initializing majority voting system")
        # Length of agents
        logger.info(f"Number of agents: {len(self.agents)}")
        logger.info(
            "Agents:"
            f" {', '.join(agent.agent_name for agent in self.agents)}"
        )

    def run(self, task: str, *args, **kwargs) -> List[Any]:
        """
        Runs the majority voting system and returns the majority vote.

        Args:
            task (str): The task to be performed by the agents.
            *args: Variable length argument list.
            **kwargs: Arbitrary keyword arguments.

        Returns:
            List[Any]: The majority vote.

        """
        # Route to each agent
        with concurrent.futures.ThreadPoolExecutor() as executor:
            logger.info("Running agents concurrently")

            futures = [
                executor.submit(agent.run, task, *args)
                for agent in self.agents
            ]
            results = [
                future.result()
                for future in concurrent.futures.as_completed(futures)
            ]

        # Add responses to conversation and log them
        for agent, response in zip(self.agents, results):
            response = (
                response if isinstance(response, list) else [response]
            )
            self.conversation.add(agent.agent_name, response)
            logger.info(
                f"[Agent][Name: {agent.agent_name}][Response:"
                f" {response}]"
            )

        # Perform majority voting on the conversation
        responses = [
            message["content"]
            for message in self.conversation.conversation_history
            if message["role"] == "agent"
        ]

        # If an output parser is provided, parse the responses
        if self.output_parser is not None:
            majority_vote = self.output_parser(
                responses, *args, **kwargs
            )
        else:
            majority_vote = majority_voting(responses)

        # Return the majority vote
        return majority_vote