Spaces:
Paused
Paused
File size: 5,780 Bytes
b929c63 88ee7ec b929c63 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
import gradio as gr
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
import os
import copy
import re
import secrets
from pathlib import Path
from pydub import AudioSegment
# Initialize the model and tokenizer
torch.manual_seed(420)
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen-Audio-Chat", trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen-Audio-Chat", device_map="cuda", trust_remote_code=True).eval()
def _parse_text(text):
lines = text.split("\n")
lines = [line for line in lines if line != ""]
count = 0
for i, line in enumerate(lines):
if "```" in line:
count += 1
items = line.split("`")
if count % 2 == 1:
lines[i] = f'<pre><code class="language-{items[-1]}">'
else:
lines[i] = f"<br></code></pre>"
else:
if i > 0:
if count % 2 == 1:
line = line.replace("`", r"\`")
line = line.replace("<", "<")
line = line.replace(">", ">")
line = line.replace(" ", " ")
line = line.replace("*", "*")
line = line.replace("_", "_")
line = line.replace("-", "-")
line = line.replace(".", ".")
line = line.replace("!", "!")
line = line.replace("(", "(")
line = line.replace(")", ")")
line = line.replace("$", "$")
lines[i] = "<br>" + line
text = "".join(lines)
return text
def predict(_chatbot, task_history):
if not task_history:
return _chatbot
query = task_history[-1][0]
history_cp = copy.deepcopy(task_history)
history_filter = []
audio_idx = 1
pre = ""
last_audio = None
for i, (q, a) in enumerate(history_cp):
if isinstance(q, (tuple, list)):
last_audio = q[0]
q = f'Audio {audio_idx}: <audio>{q[0]}</audio>'
pre += q + '\n'
audio_idx += 1
else:
pre += q
history_filter.append((pre, a))
pre = ""
history, message = history_filter[:-1], history_filter[-1][0]
response, history = model.chat(tokenizer, message, history=history)
ts_pattern = r"<\|\d{1,2}\.\d+\|>"
all_time_stamps = re.findall(ts_pattern, response)
if (len(all_time_stamps) > 0) and (len(all_time_stamps) % 2 ==0) and last_audio:
ts_float = [ float(t.replace("<|","").replace("|>","")) for t in all_time_stamps]
ts_float_pair = [ts_float[i:i + 2] for i in range(0,len(all_time_stamps),2)]
# θ―»ει³ι’ζδ»Ά
format = os.path.splitext(last_audio)[-1].replace(".","")
audio_file = AudioSegment.from_file(last_audio, format=format)
chat_response_t = response.replace("<|", "").replace("|>", "")
chat_response = chat_response_t
temp_dir = secrets.token_hex(20)
temp_dir = Path(uploaded_file_dir) / temp_dir
temp_dir.mkdir(exist_ok=True, parents=True)
# ζͺει³ι’ζδ»Ά
for pair in ts_float_pair:
audio_clip = audio_file[pair[0] * 1000: pair[1] * 1000]
# δΏει³ι’ζδ»Ά
name = f"tmp{secrets.token_hex(5)}.{format}"
filename = temp_dir / name
audio_clip.export(filename, format=format)
_chatbot[-1] = (_parse_text(query), chat_response)
_chatbot.append((None, (str(filename),)))
else:
_chatbot[-1] = (_parse_text(query), response)
full_response = _parse_text(response)
task_history[-1] = (query, full_response)
print("Qwen-Audio-Chat: " + _parse_text(full_response))
return _chatbot
def regenerate(_chatbot, task_history):
if not task_history:
return _chatbot
item = task_history[-1]
if item[1] is None:
return _chatbot
task_history[-1] = (item[0], None)
chatbot_item = _chatbot.pop(-1)
if chatbot_item[0] is None:
_chatbot[-1] = (_chatbot[-1][0], None)
else:
_chatbot.append((chatbot_item[0], None))
return predict(_chatbot, task_history)
def add_text(history, task_history, text):
history = history + [(_parse_text(text), None)]
task_history = task_history + [(text, None)]
return history, task_history, ""
def add_file(history, task_history, file):
history = history + [((file.name,), None)]
task_history = task_history + [((file.name,), None)]
return history, task_history
def add_mic(history, task_history, file):
if file is None:
return history, task_history
os.rename(file, file + '.wav')
print("add_mic file:", file)
print("add_mic history:", history)
print("add_mic task_history:", task_history)
# history = history + [((file.name,), None)]
# task_history = task_history + [((file.name,), None)]
task_history = task_history + [((file + '.wav',), None)]
history = history + [((file + '.wav',), None)]
print("task_history", task_history)
return history, task_history
def reset_user_input():
return gr.update(value="")
def reset_state(task_history):
task_history.clear()
return []
iface = gr.Interface(
fn=predict,
inputs=[
gr.inputs.Audio(label="Audio Input"),
gr.inputs.Textbox(label="Text Query"),
gr.State()
],
outputs=[
"text",
gr.State()
],
title="Audio-Text Interaction Model",
description="This model can process an audio input along with a text query and provide a response.",
theme="default",
allow_flagging="never"
)
iface.launch() |