File size: 4,505 Bytes
a35fa4d
9f68e0d
a35fa4d
 
 
9f68e0d
a35fa4d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9f68e0d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a35fa4d
9f68e0d
 
 
 
 
 
a35fa4d
 
 
 
9f68e0d
 
 
 
 
 
 
 
 
 
a35fa4d
9f68e0d
 
 
 
a35fa4d
 
 
4bd8f86
78dfff8
 
 
 
4bd8f86
 
78dfff8
4bd8f86
 
 
 
78dfff8
 
 
 
9f68e0d
a35fa4d
4bd8f86
 
 
78dfff8
 
a35fa4d
78dfff8
 
a35fa4d
 
 
 
 
 
 
 
 
 
 
 
 
 
78dfff8
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import re
from typing import List

import gradio as gr

from components import all_tasks, Input, MAX_INPUTS, MAX_TASKS, Task


def add_input(*visibility):
    for i, visible in enumerate(visibility, 1):
        if not bool(visible):
            return (
                [gr.Textbox.update(visible=True)] * i
                + [gr.Textbox.update(visible=False, value="")] * (MAX_INPUTS - i)
                + [1] * i
                + [0] * (MAX_INPUTS - i)
            )


def remove_input(*visibility):
    for i, visible in reversed(list(enumerate(visibility, 1))):
        if bool(visible):
            return (
                [gr.Textbox.update(visible=True)] * (i - 1)
                + [gr.Textbox.update(visible=False, value="")] * (MAX_INPUTS - i + 1)
                + [1] * (i - 1)
                + [0] * (MAX_INPUTS - i + 1)
            )


def _is_task_row_fully_invisible(row: List[int]) -> bool:
    for visible in row:
        if bool(visible):
            return False
    return True


def add_task(index, *visibility):
    visibility = list(visibility)
    n_avail_tasks = len(Task.AVAILABLE_TASKS)

    for i in range(MAX_TASKS):
        start_row = i * n_avail_tasks
        is_row_invisible = _is_task_row_fully_invisible(
            visibility[start_row : start_row + n_avail_tasks]
        )
        if is_row_invisible:
            unchanged_up_to = start_row + index
            return (
                [gr.Box.update()] * unchanged_up_to
                + [gr.Box.update(visible=True)]
                + [gr.Box.update()] * (len(visibility) - unchanged_up_to - 1)
                + [gr.Number.update()] * unchanged_up_to
                + [1]
                + [gr.Number.update()] * (len(visibility) - unchanged_up_to - 1)
            )


def remove_task(*visibility):
    visibility = list(visibility)
    n_avail_tasks = len(Task.AVAILABLE_TASKS)

    for i in range(MAX_TASKS):
        start_row = i * n_avail_tasks
        is_row_invisible = _is_task_row_fully_invisible(
            visibility[start_row : start_row + n_avail_tasks]
        )
        if is_row_invisible:
            unchanged_up_to = start_row - n_avail_tasks
            return (
                [gr.Box.update()] * unchanged_up_to
                + [gr.Box.update(visible=False)] * (len(visibility) - unchanged_up_to)
                + [gr.Number.update()] * unchanged_up_to
                + [0] * (len(visibility) - unchanged_up_to)
            )


def execute_task(id_: int, prev_error_value, n_task_inputs, *vars_in_scope):
    """
    Params:
        - id_: This will tell us which task to execute.
        - prev_error_value: I carry around whether there is an error in the execution, to be displayed at the end.
        - n_task_inputs: How many inputs does this task have?
        - vars_in_scope: All variables in scope. This can be a) input varaibles, b) task inputs or c) previous task outputs.
    """
    n_task_inputs = int(n_task_inputs)
    task_inputs = vars_in_scope[:n_task_inputs]
    input_vars = vars_in_scope[n_task_inputs:MAX_INPUTS]
    task_outputs = vars_in_scope[n_task_inputs + MAX_INPUTS :]
    non_empty_task_inputs = [ti for ti in task_inputs if ti]

    # Put all defined variables into a dict, with names (except task inputs)
    vars = {
        f"{Input.VNAME}{i}": input_ for i, input_ in enumerate(input_vars) if input_
    }
    vars.update(
        {f"{Task.VNAME}{i}": task_output for i, task_output in enumerate(task_outputs)}
    )
    # Get all variables referenced within the task inputs
    prompt_vars = {v for ti in non_empty_task_inputs for v in re.findall("{(.*?)}", ti)}

    # If there is an undefined variable referenced, HighlightedText will signal the error.
    undefined_vars = prompt_vars - vars.keys()
    if len(undefined_vars) > 0:
        return None, gr.HighlightedText.update(
            value=[
                (
                    f"The following variables are being used before being defined :: {undefined_vars}. Please check your tasks.",
                    "ERROR",
                )
            ],
            visible=True,
        )
    error_update = gr.HighlightedText.update(
        value=prev_error_value, visible=prev_error_value is not None
    )

    if non_empty_task_inputs:
        # Execute the task logic
        return (
            all_tasks[id_].execute(*non_empty_task_inputs, vars),
            error_update,
        )
    else:
        # There is no actionf or this task.
        return None, error_update