File size: 3,153 Bytes
a35fa4d
9f68e0d
a35fa4d
 
 
c4df480
a35fa4d
 
9f68e0d
 
 
 
 
 
 
c121218
 
 
a35fa4d
c121218
 
 
 
a35fa4d
c121218
a35fa4d
9f68e0d
c121218
 
 
a35fa4d
c121218
 
 
 
a35fa4d
c121218
a35fa4d
 
c4df480
78dfff8
 
c4df480
 
78dfff8
c4df480
78dfff8
c4df480
 
 
 
 
 
 
 
 
 
4bd8f86
c4df480
 
 
 
 
 
 
 
 
 
 
 
 
a35fa4d
78dfff8
c4df480
a35fa4d
c4df480
 
 
 
 
 
 
 
 
 
 
a35fa4d
c4df480
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import re
from typing import List

import gradio as gr

from components import MAX_TASKS, all_tasks, Task


def _is_task_row_fully_invisible(row: List[int]) -> bool:
    for visible in row:
        if bool(visible):
            return False
    return True


def add_task(*visibilities):
    for i, visible in enumerate(visibilities, 1):
        if not bool(visible):
            return (
                [gr.Box.update(visible=True)] * i
                + [gr.Box.update(visible=False)] * (MAX_TASKS - i)
                + [1] * i
                + [0] * (MAX_TASKS - i)
            )
    return [gr.Box.update()] * MAX_TASKS + [gr.Number.update()] * MAX_TASKS


def remove_task(*visibilities):
    for i, visible in reversed(list(enumerate(visibilities))):
        if bool(visible):
            return (
                [gr.Box.update(visible=True)] * i
                + [gr.Box.update(visible=False)] * (MAX_TASKS - i)
                + [1] * i
                + [0] * (MAX_TASKS - i)
            )
    return [gr.Box.update()] * MAX_TASKS + [gr.Number.update()] * MAX_TASKS


def execute_task(task_id: int, active_index: int, error_value, *args):
    """
    Params:
        - task_id: This will tell us which task to execute.
        - active_index: The index of the actual task that is visible.
        - prev_error_value: I carry around whether there is an error in the execution, to be displayed at the end.
        - args: Other variables that will be decomposed.
    """
    task_id = int(task_id)
    active_index = int(active_index)
    n_avail_tasks = len(Task.available_tasks)

    task_input = args[:n_avail_tasks][active_index]
    prev_active_indexes = args[n_avail_tasks : n_avail_tasks + task_id]
    prev_task_outputs = args[n_avail_tasks + task_id :]

    error_update = gr.HighlightedText.update(
        value=error_value, visible=error_value is not None
    )
    # We need to return outputs for all tasks in the row.
    outputs = [""] * n_avail_tasks

    if not task_input:
        return outputs + [error_update]

    vars_in_scope = {}
    for i, prev_active_index in enumerate(prev_active_indexes):
        vars_in_scope[f"{Task.vname}{i}"] = prev_task_outputs[
            i * n_avail_tasks + int(prev_active_index)
        ]
    # Get all variables referenced within the task input
    prompt_vars = re.findall("{(.*?)}", task_input)

    # If there is an undefined variable referenced, HighlightedText will signal the error.
    undefined_vars = prompt_vars - vars_in_scope.keys()
    if len(undefined_vars) > 0:
        return outputs + [
            gr.HighlightedText.update(
                value=[
                    (
                        f"The following variables are being used before being defined :: {undefined_vars}. Please check your tasks.",
                        "ERROR",
                    )
                ],
                visible=True,
            )
        ]

    formatted_input = task_input.format(**vars_in_scope)
    # Task logic gets inserted into the right index
    outputs[active_index] = all_tasks[task_id].execute(active_index, formatted_input)
    return outputs + [error_update]