Spaces:
Sleeping
Sleeping
from ultralyticsplus import YOLO | |
from typing import Optional, Union, Annotated | |
from scipy.spatial import distance as dist | |
import time | |
from fastapi import FastAPI, File, UploadFile, Form | |
from fastapi.responses import StreamingResponse | |
from fastapi.middleware.gzip import GZipMiddleware | |
from io import BytesIO | |
from utils import tts, stt, read_image_file, pil_to_base64, base64_to_pil, get_hist, ffmpeg_read | |
import zipfile | |
import soundfile as sf | |
import openai | |
import os | |
import random | |
# Config for camera picture | |
model = YOLO('ultralyticsplus/yolov8s') | |
# model = YOLO('kadirnar/yolov8n-v8.0') | |
CLASS = model.model.names | |
ZIP = False | |
# bot_voice_time = "おはようございます" | |
bot_voice_time = "こんにちは" | |
default_bot_voice_list = [f"{bot_voice_time}、アイティコンサルティングとシステム開発を支援します。よろしくお願いします。", | |
f"{bot_voice_time}、デトモです。システム開発全般を支援します。", | |
f"{bot_voice_time}、デトモです。オフショア開発全般を支援します。", | |
f"{bot_voice_time}、私はアイサロボです。システム開発全般を支援します。", | |
f"{bot_voice_time}、エッジコンピューティングソリューションを提供します。"] | |
area_threshold = 0 | |
diff_value_threshold = 0 | |
# Config for human input | |
prompt_template = "私はあなたに、Detomo社が作ったロボットのように振る舞ってほしいです。デトモは高度なデジタル化社会を支えます。"\ | |
"ビジネスの課題解決策を提案するコンサ ルティング・サービスと、課題解決を実現す るシステムの開発サービス、また、企業内 の情報システム部門の業務の代行サー ビスにも対応しています。"\ | |
"デトモはITコンサルティング・システム開発を得意とし、お客様の課題解決をお手伝いいたします。"\ | |
"あなたの名前はアイサロボです。"\ | |
"あなたのミッションは、子供たちが他の子供たちに挨拶する自信を持ち、幸せになることを助けることです。"\ | |
"質問には簡単な方法でしか答えないようにし、明示的に要求されない限り、追加情報を提供しないでください。" | |
system_prompt = [{"role": "system", "content": prompt_template}] | |
openai.api_key = os.environ["OPENAI_API_KEY"] | |
app = FastAPI() | |
app.add_middleware(GZipMiddleware, minimum_size=1000) | |
def read_root(): | |
return {"Message": "Application startup complete"} | |
def client_settings_api(): | |
return {"camera_picture_period": 5} | |
async def camera_picture_api( | |
file: UploadFile = File(...), | |
last_seen: Optional[Union[str, UploadFile]] = Form(None), | |
return_voice: Annotated[bool, Form()] = True, | |
): | |
# parameters | |
total_time = time.time() | |
most_close = 0 | |
out_img = None | |
diff_value = 0.5 | |
default_bot_voice = random.choice(default_bot_voice_list) | |
# read image and predict | |
image = read_image_file(await file.read()) | |
results = model.predict(image, show=False)[0] | |
masks, boxes = results.masks, results.boxes | |
area_image = image.width * image.height | |
# select and crop face image | |
if boxes is not None: | |
for xyxy, conf, cls in zip(boxes.xyxy, boxes.conf, boxes.cls): | |
if int(cls) != 0: | |
continue | |
box = xyxy.tolist() | |
area_rate = (box[2] - box[0]) * (box[3] - box[1]) / area_image | |
if area_rate >= most_close: | |
out_img = image.crop(tuple(box)).resize((64, 64)) | |
most_close = area_rate | |
# check detect people or not | |
if out_img is None: | |
return { | |
"status": "No face detected", | |
"text": None, | |
"voice": None, | |
"image": None | |
} | |
else: | |
if ZIP: | |
image_bot_path = pil_to_base64(out_img, encode=False) | |
else: | |
image_bot_path = pil_to_base64(out_img, encode=True) | |
# check with previous image if have | |
if last_seen is not None: | |
if type(last_seen) == str: | |
last_seen = base64_to_pil(last_seen) | |
else: | |
last_seen = read_image_file(await last_seen.read()) | |
diff_value = dist.euclidean(get_hist(out_img), get_hist(last_seen)) | |
print(f"Distance: {most_close}. Different value: {diff_value}") | |
# return results | |
if most_close >= area_threshold and diff_value >= diff_value_threshold: | |
if ZIP: | |
voice_bot_path = tts(default_bot_voice, language="ja", encode=False) | |
io = BytesIO() | |
zip_filename = "final_archive.zip" | |
with zipfile.ZipFile(io, mode='w', compression=zipfile.ZIP_DEFLATED) as zf: | |
for file_path in [voice_bot_path, image_bot_path]: | |
zf.write(file_path) | |
zf.close() | |
print("Total time", time.time() - total_time) | |
return StreamingResponse( | |
iter([io.getvalue()]), | |
media_type="application/x-zip-compressed", | |
headers={"Content-Disposition": f"attachment;filename=%s" % zip_filename} | |
) | |
else: | |
if return_voice: | |
print("Total time", time.time() - total_time) | |
return { | |
"status": "New people", | |
"text": default_bot_voice, | |
"voice": tts(default_bot_voice, language="ja", encode=True), | |
"image": image_bot_path | |
} | |
else: | |
print("Total time", time.time() - total_time) | |
return { | |
"status": "New people", | |
"text": default_bot_voice, | |
"voice": None, | |
"image": image_bot_path | |
} | |
elif most_close < area_threshold: | |
print("Total time", time.time() - total_time) | |
return { | |
"status": "People far from camera", | |
"text": None, | |
"voice": None, | |
"image": image_bot_path, | |
} | |
else: | |
print("Total time", time.time() - total_time) | |
return { | |
"status": "Old people", | |
"text": None, | |
"voice": None, | |
"image": image_bot_path, | |
} | |
async def human_input_api( | |
voice_input: bytes = File(None), | |
text_input: str = Form(None), | |
temperature: Annotated[float, Form()] = 0.7, | |
max_tokens: Annotated[int, Form()] = 1000, | |
return_voice: Annotated[bool, Form()] = False, | |
): | |
if text_input: | |
text = text_input | |
elif text_input is None and voice_input is not None: | |
upload_audio = ffmpeg_read(voice_input, sampling_rate=24000) | |
sf.write('temp.wav', upload_audio, 24000, subtype='PCM_16') | |
text = stt('temp.wav') | |
print(text) | |
else: | |
if return_voice: | |
return { | |
"human_text": None, | |
"robot_text": None, | |
"robot_voice": None | |
} | |
else: | |
return { | |
"human_text": None, | |
"robot_text": None, | |
} | |
prompt_msg = {"role": "user", "content": text} | |
messages = system_prompt + [prompt_msg] | |
completion = openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=messages, temperature=temperature, | |
max_tokens=max_tokens) | |
print(completion['usage']['total_tokens']) | |
if return_voice: | |
return { | |
"human_text": text, | |
"robot_text": completion.choices[0].message.content, | |
"robot_voice": tts(completion.choices[0].message.content, language="ja", encode=True) | |
} | |
else: | |
return { | |
"human_text": text, | |
"robot_text": completion.choices[0].message.content, | |
} |