File size: 5,491 Bytes
6fec0c8
 
 
 
dbb14b6
6fec0c8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
482bc3b
 
 
 
6fec0c8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dbb14b6
6fec0c8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dbb14b6
 
 
 
 
 
c89b357
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""Tools to handle multimodal understandig."""

import os
import io
import re
import requests

import librosa
import soundfile as sf
import pandas as pd
from llama_index.core.tools import FunctionTool
from huggingface_hub import InferenceClient
from transformers import pipeline

DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"

def transcribe_audio(file_id: str) -> str:
    """
    Transcribes an English audio file identfied by its id.
    """   
    try:
        audio, sr = sf.read(_get_file(file_id))
        if sr != 16000:
            audio = librosa.resample(audio, orig_sr=sr, target_sr=16000)
    except:
        return "Error: Invalid file. This file is either not an audio file or the id does not exist."

    asr = pipeline("automatic-speech-recognition", model="openai/whisper-tiny")

    if (len(audio) / 16000) > 25:
        output = asr(audio, return_timestamps=True)
    else:
        output = asr(audio)

    return output["text"].strip()

def transcribe_audio_hf(file_id: str) -> str:
    """
    Transcribes an audio file identfied by its id.
    """   
    #audio, sr = sf.read(_get_file(file_id))
    try:
        audio_bytes = _get_file(file_id).read()
    except:
        return "Error: Invalid file. This file is either not an audio file or the id does not exist."

    client = InferenceClient(
        provider="hf-inference",
        api_key=os.getenv("HF_TOKEN"),
    )

    output = client.automatic_speech_recognition(audio_bytes, model="openai/whisper-small")
    return output

def get_transcription_tool():
    return FunctionTool.from_defaults(
        fn=transcribe_audio,
        description="Transcribes an audio file identified by its id."
    )


def answer_image_question(question: str, file_id: str) -> str:
    """
    Answers questions about an image identified by its id.
    """
    client = InferenceClient(
        provider="hf-inference",
        api_key=os.getenv("HF_TOKEN"),
    )
    
    completion = client.chat.completions.create(
        model= "Qwen/Qwen2.5-VL-32B-Instruct",
        messages=[
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": question
                    },
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": DEFAULT_API_URL + f"/files/{file_id}",
                        }
                    }
                ]
            }
        ],
        max_tokens=512,
    )

    return remove_think(completion.choices[0].message.content)

def get_image_qa_tool():
    return FunctionTool.from_defaults(
        fn=answer_image_question,
        description="Answer a question about a given image. The image is identified by a file id."
    )

def read_excel(file_id: str) -> str:
    file_io = _get_file(file_id)
    df = pd.read_excel(file_io)
    return df.to_markdown()

def get_excel_tool():
    return FunctionTool.from_defaults(
        fn=read_excel,
        description="Convert an excel file that is identified by its file id into a markdown string."
    )

def analyse_excel(file_id: str) -> str:
    file_io = _get_file(file_id)
    df = pd.read_excel(file_io)
    return df.describe()

def get_excel_analysis_tool():
    return FunctionTool.from_defaults(
        fn=read_excel,
        description="Analyse an excel file that is identified by its file id and get common statistics such as mean or max per column."
    )

def read_csv(file_id: str) -> str:
    file_io = _get_file(file_id)
    df = pd.read_csv(file_io)
    return df.to_markdown()

def get_csv_tool():
    return FunctionTool.from_defaults(
        fn=read_excel,
        description="Convert a csv file that is identified by its file id into a markdown string."
    )

def analyse_csv(file_id: str) -> str:
    file_io = _get_file(file_id)
    df = pd.read_csv(file_io)
    return df.describe()

def get_csv_analysis_tool():
    return FunctionTool.from_defaults(
        fn=read_excel,
        description="Analyse a csv file that is identified by its file id and get common statistics such as mean or max per column."
    )

def watch_video(video_url: str) -> str:
    return "You are not able to watch a Video yet. Reply with 'I don't know' to the question."

def get_video_tool():
    return FunctionTool.from_defaults(
        fn=watch_video,
        description="Watch a video and get a content description as a string."
    )

def _get_file(task_id: str) -> io.BytesIO:
    res = requests.get(DEFAULT_API_URL + f"/files/{task_id}")
    if res.status_code != 200:
        raise FileNotFoundError("Invalid file or task id.")
    file_like = io.BytesIO(res.content)
    return file_like

def remove_think(output: str) -> str:
    """Removes the <think> part of an LLM output."""
    if output:
        return re.sub("<think>.*</think>", "", output).strip()
    return output

def read_txt_or_py_file(file_id: str) -> str:
    """Read a python or txt file as plain text and return its content."""
    try:
        bytes_io = _get_file(file_id)
    except:
        return "Error: Invalid file. This file is either not a .py/.txt file or the id does not exist." 
    bytes_io.seek(0)
    return bytes_io.read().decode()

def get_read_file_tool():
    return FunctionTool.from_defaults(
        fn=read_txt_or_py_file,
        description="Read a python or txt file as plain text and return its content."
    )