Spaces:
Sleeping
Sleeping
| from ultralyticsplus import YOLO | |
| from typing import Optional, Union, Annotated | |
| from scipy.spatial import distance as dist | |
| import time | |
| from fastapi import FastAPI, File, UploadFile, Form | |
| from fastapi.responses import StreamingResponse | |
| from fastapi.middleware.gzip import GZipMiddleware | |
| from io import BytesIO | |
| from utils import tts, stt, read_image_file, pil_to_base64, base64_to_pil, get_hist, ffmpeg_read | |
| import zipfile | |
| import soundfile as sf | |
| import openai | |
| import os | |
| import random | |
| # Config for camera picture | |
| model = YOLO('ultralyticsplus/yolov8s') | |
| # model = YOLO('kadirnar/yolov8n-v8.0') | |
| CLASS = model.model.names | |
| ZIP = False | |
| # bot_voice_time = "おはようございます" | |
| bot_voice_time = "こんにちは" | |
| default_bot_voice_list = [f"{bot_voice_time}、アイティコンサルティングとシステム開発を支援します。よろしくお願いします。", | |
| f"{bot_voice_time}、デトモです。システム開発全般を支援します。", | |
| f"{bot_voice_time}、デトモです。オフショア開発全般を支援します。", | |
| f"{bot_voice_time}、私はアイサロボです。システム開発全般を支援します。", | |
| f"{bot_voice_time}、エッジコンピューティングソリューションを提供します。"] | |
| area_threshold = 0 | |
| diff_value_threshold = 0 | |
| # Config for human input | |
| prompt_template = "私はあなたに、Detomo社が作ったロボットのように振る舞ってほしいです。デトモは高度なデジタル化社会を支えます。"\ | |
| "ビジネスの課題解決策を提案するコンサ ルティング・サービスと、課題解決を実現す るシステムの開発サービス、また、企業内 の情報システム部門の業務の代行サー ビスにも対応しています。"\ | |
| "デトモはITコンサルティング・システム開発を得意とし、お客様の課題解決をお手伝いいたします。"\ | |
| "あなたの名前はアイサロボです。"\ | |
| "あなたのミッションは、子供たちが他の子供たちに挨拶する自信を持ち、幸せになることを助けることです。"\ | |
| "質問には簡単な方法でしか答えないようにし、明示的に要求されない限り、追加情報を提供しないでください。" | |
| system_prompt = [{"role": "system", "content": prompt_template}] | |
| openai.api_key = os.environ["OPENAI_API_KEY"] | |
| app = FastAPI() | |
| app.add_middleware(GZipMiddleware, minimum_size=1000) | |
| def read_root(): | |
| return {"Message": "Application startup complete"} | |
| def client_settings_api(): | |
| return {"camera_picture_period": 5} | |
| async def camera_picture_api( | |
| file: UploadFile = File(...), | |
| last_seen: Optional[Union[str, UploadFile]] = Form(None), | |
| return_voice: Annotated[bool, Form()] = True, | |
| ): | |
| # parameters | |
| total_time = time.time() | |
| most_close = 0 | |
| out_img = None | |
| diff_value = 0.5 | |
| default_bot_voice = random.choice(default_bot_voice_list) | |
| # read image and predict | |
| image = read_image_file(await file.read()) | |
| results = model.predict(image, show=False)[0] | |
| masks, boxes = results.masks, results.boxes | |
| area_image = image.width * image.height | |
| # select and crop face image | |
| if boxes is not None: | |
| for xyxy, conf, cls in zip(boxes.xyxy, boxes.conf, boxes.cls): | |
| if int(cls) != 0: | |
| continue | |
| box = xyxy.tolist() | |
| area_rate = (box[2] - box[0]) * (box[3] - box[1]) / area_image | |
| if area_rate >= most_close: | |
| out_img = image.crop(tuple(box)).resize((64, 64)) | |
| most_close = area_rate | |
| # check detect people or not | |
| if out_img is None: | |
| return { | |
| "status": "No face detected", | |
| "text": None, | |
| "voice": None, | |
| "image": None | |
| } | |
| else: | |
| if ZIP: | |
| image_bot_path = pil_to_base64(out_img, encode=False) | |
| else: | |
| image_bot_path = pil_to_base64(out_img, encode=True) | |
| # check with previous image if have | |
| if last_seen is not None: | |
| if type(last_seen) == str: | |
| last_seen = base64_to_pil(last_seen) | |
| else: | |
| last_seen = read_image_file(await last_seen.read()) | |
| diff_value = dist.euclidean(get_hist(out_img), get_hist(last_seen)) | |
| print(f"Distance: {most_close}. Different value: {diff_value}") | |
| # return results | |
| if most_close >= area_threshold and diff_value >= diff_value_threshold: | |
| if ZIP: | |
| voice_bot_path = tts(default_bot_voice, language="ja", encode=False) | |
| io = BytesIO() | |
| zip_filename = "final_archive.zip" | |
| with zipfile.ZipFile(io, mode='w', compression=zipfile.ZIP_DEFLATED) as zf: | |
| for file_path in [voice_bot_path, image_bot_path]: | |
| zf.write(file_path) | |
| zf.close() | |
| print("Total time", time.time() - total_time) | |
| return StreamingResponse( | |
| iter([io.getvalue()]), | |
| media_type="application/x-zip-compressed", | |
| headers={"Content-Disposition": f"attachment;filename=%s" % zip_filename} | |
| ) | |
| else: | |
| if return_voice: | |
| print("Total time", time.time() - total_time) | |
| return { | |
| "status": "New people", | |
| "text": default_bot_voice, | |
| "voice": tts(default_bot_voice, language="ja", encode=True), | |
| "image": image_bot_path | |
| } | |
| else: | |
| print("Total time", time.time() - total_time) | |
| return { | |
| "status": "New people", | |
| "text": default_bot_voice, | |
| "voice": None, | |
| "image": image_bot_path | |
| } | |
| elif most_close < area_threshold: | |
| print("Total time", time.time() - total_time) | |
| return { | |
| "status": "People far from camera", | |
| "text": None, | |
| "voice": None, | |
| "image": image_bot_path, | |
| } | |
| else: | |
| print("Total time", time.time() - total_time) | |
| return { | |
| "status": "Old people", | |
| "text": None, | |
| "voice": None, | |
| "image": image_bot_path, | |
| } | |
| async def human_input_api( | |
| voice_input: bytes = File(None), | |
| text_input: str = Form(None), | |
| temperature: Annotated[float, Form()] = 0.7, | |
| max_tokens: Annotated[int, Form()] = 1000, | |
| return_voice: Annotated[bool, Form()] = False, | |
| ): | |
| if text_input: | |
| text = text_input | |
| elif text_input is None and voice_input is not None: | |
| upload_audio = ffmpeg_read(voice_input, sampling_rate=24000) | |
| sf.write('temp.wav', upload_audio, 24000, subtype='PCM_16') | |
| text = stt('temp.wav') | |
| print(text) | |
| else: | |
| if return_voice: | |
| return { | |
| "human_text": None, | |
| "robot_text": None, | |
| "robot_voice": None | |
| } | |
| else: | |
| return { | |
| "human_text": None, | |
| "robot_text": None, | |
| } | |
| prompt_msg = {"role": "user", "content": text} | |
| messages = system_prompt + [prompt_msg] | |
| completion = openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=messages, temperature=temperature, | |
| max_tokens=max_tokens) | |
| print(completion['usage']['total_tokens']) | |
| if return_voice: | |
| return { | |
| "human_text": text, | |
| "robot_text": completion.choices[0].message.content, | |
| "robot_voice": tts(completion.choices[0].message.content, language="ja", encode=True) | |
| } | |
| else: | |
| return { | |
| "human_text": text, | |
| "robot_text": completion.choices[0].message.content, | |
| } |