Spaces:
Running
Running
import io | |
from fastapi import FastAPI, File, UploadFile | |
import subprocess | |
import os | |
import requests | |
import random | |
from datetime import datetime | |
from datetime import date | |
import json | |
from pydantic import BaseModel | |
from typing import Annotated | |
import random | |
from fastapi import FastAPI, Response | |
import string | |
import time | |
from huggingface_hub import InferenceClient | |
from fastapi import Form | |
class Query(BaseModel): | |
text: str | |
class Query2(BaseModel): | |
text: str | |
class QueryM(BaseModel): | |
text: str | |
tokens:int | |
temp:float | |
topp:float | |
topk:float | |
from fastapi import FastAPI, Request, Depends, UploadFile, File | |
from fastapi.exceptions import HTTPException | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import JSONResponse | |
app = FastAPI() | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=['*'], | |
allow_credentials=True, | |
allow_methods=['*'], | |
allow_headers=['*'], | |
) | |
# cred = credentials.Certificate('key.json') | |
# app1 = firebase_admin.initialize_app(cred) | |
# db = firestore.client() | |
# data_frame = pd.read_csv('data.csv') | |
async def startup_event(): | |
print("on startup") | |
# requests.get("https://audiospace-1-u9912847.deta.app/sendcode") | |
audio_space="https://audiospace-1-u9912847.deta.app/uphoto" | |
import threading | |
from huggingface_hub.inference_api import InferenceApi | |
client = InferenceClient() | |
async def get_answer(q: Query ): | |
text = q.text | |
try: | |
global client | |
imagei = client.text_to_image(text) | |
byte_array = io.BytesIO() | |
imagei.save(byte_array, format='JPEG') | |
response = Response(content=byte_array.getvalue(), media_type="image/png") | |
return response | |
except: | |
return JSONResponse({"status":False}) | |
async def get_answer(q: QueryM ): | |
text = q.text | |
try: | |
client = InferenceClient() | |
generate_kwargs = dict( | |
max_new_tokens= int(q.tokens), | |
do_sample=True, | |
top_p= q.topp, | |
top_k=int(q.topk), | |
temperature=q.temp, | |
) | |
inputs= text | |
response = client.post(json={"inputs": inputs, "parameters": generate_kwargs}, model="mistralai/Mistral-7B-Instruct-v0.1") | |
json_string = response.decode('utf-8') | |
list_of_dicts = json.loads(json_string) | |
result_dict = list_of_dicts[0] | |
x=(result_dict['generated_text']) | |
x=x.replace(inputs,'') | |
return JSONResponse({"result":x,"status":True}) | |
except Exception as e: | |
print(e) | |
return JSONResponse({"status":False}) | |
async def get_answer(q: QueryM ): | |
text = q.text | |
try: | |
client = InferenceClient() | |
generate_kwargs = dict( | |
max_new_tokens= int(q.tokens), | |
repetition_penalty=1.0, | |
top_p= q.topp, | |
top_k=int(q.topk), | |
temperature=q.temp, | |
stop= ["</s>", "<|>"] | |
) | |
inputs= text | |
response = client.post(json={"inputs": inputs, "parameters": generate_kwargs}, model="mistralai/Mistral-7B-Instruct-v0.1") | |
json_string = response.decode('utf-8') | |
list_of_dicts = json.loads(json_string) | |
result_dict = list_of_dicts[0] | |
x=(result_dict['generated_text']) | |
x=x.replace(inputs,'') | |
return JSONResponse({"result":x,"status":True}) | |
except Exception as e: | |
print(e) | |
return JSONResponse({"status":False}) | |
async def get_answer(q: QueryM ): | |
text = q.text | |
try: | |
client = InferenceClient() | |
generate_kwargs = dict( | |
max_new_tokens= int(q.tokens), | |
repetition_penalty=1.0, | |
top_p= q.topp, | |
top_k=int(q.topk), | |
temperature=q.temp, | |
stop= ["<|end_of_turn|>", "<??|>"] | |
) | |
inputs= text | |
response = client.post(json={"inputs": inputs, "parameters": generate_kwargs},model="openchat/openchat_3.5") | |
json_string = response.decode('utf-8') | |
list_of_dicts = json.loads(json_string) | |
result_dict = list_of_dicts[0] | |
x=(result_dict['generated_text']) | |
x=x.replace(inputs,'') | |
return JSONResponse({"result":x,"status":True}) | |
except Exception as e: | |
print(e) | |
return JSONResponse({"status":False}) | |
async def get_answer(q: Query ): | |
text = q.text | |
try: | |
global client | |
imagei = client.text_to_image(text, model ='openskyml/dalle-3-xl') | |
byte_array = io.BytesIO() | |
imagei.save(byte_array, format='JPEG') | |
response = Response(content=byte_array.getvalue(), media_type="image/png") | |
return response | |
except: | |
return JSONResponse({"status":False}) | |