Spaces:
Running
Running
import logging | |
import json | |
import time | |
import io | |
import os | |
import re | |
import requests | |
import textwrap | |
import random | |
import hashlib | |
from datetime import datetime | |
from PIL import Image, ImageDraw, ImageFilter, ImageFont | |
import anthropic_bedrock | |
import gradio as gr | |
from opencc import OpenCC | |
from openai import OpenAI | |
from anthropic_bedrock import AnthropicBedrock, HUMAN_PROMPT, AI_PROMPT | |
from google.auth.transport.requests import Request | |
from google.oauth2.service_account import Credentials | |
from google import auth | |
from google.cloud import bigquery | |
from google.cloud import storage | |
SERVICE_ACCOUNT_INFO = os.getenv("GBQ_TOKEN") | |
SCOPES = ["https://www.googleapis.com/auth/cloud-platform"] | |
service_account_info_dict = json.loads(SERVICE_ACCOUNT_INFO) | |
creds = Credentials.from_service_account_info(service_account_info_dict, scopes=SCOPES) | |
gbq_client = bigquery.Client( | |
credentials=creds, project=service_account_info_dict["project_id"] | |
) | |
gcs_client = storage.Client( | |
credentials=creds, project=service_account_info_dict["project_id"] | |
) | |
class CompletionReward: | |
def __init__(self): | |
self.player_backend_user_id = None | |
self.player_name = None | |
self.background_url = None | |
self.player_selected_character = None | |
self.player_selected_model = None | |
self.player_selected_paragraph = None | |
self.paragraph_openai = None | |
self.paragraph_aws = None | |
self.paragraph_google = None | |
self.paragraph_mtk = None | |
self.player_certificate_url = None | |
self.openai_agent = OpenAIAgent() | |
self.aws_agent = AWSAgent() | |
self.google_agent = GoogleAgent() | |
self.mtk_agent = MTKAgent() | |
self.shuffled_response_order = {} | |
self.paragraph_map = { | |
"openai": self.paragraph_openai, | |
"aws": self.paragraph_aws, | |
"google": self.paragraph_google, | |
"mtk": self.paragraph_mtk, | |
} | |
def get_llm_response(self, player_logs): | |
agents_responses = { | |
"openai": self.openai_agent.get_story(player_logs), | |
"aws": self.aws_agent.get_story(player_logs), | |
"google": self.google_agent.get_story(player_logs), | |
"mtk": self.mtk_agent.get_story(player_logs), | |
} | |
self.paragraph_openai = agents_responses["openai"] | |
self.paragraph_aws = agents_responses["aws"] | |
self.paragraph_google = agents_responses["google"] | |
self.paragraph_mtk = agents_responses["mtk"] | |
response_items = list(agents_responses.items()) | |
random.shuffle(response_items) | |
self.shuffled_response_order = { | |
str(index): agent for index, (agent, _) in enumerate(response_items) | |
} | |
shuffled_responses = tuple(response for _, response in response_items) | |
return ( | |
[(None, shuffled_responses[0])], | |
[(None, shuffled_responses[1])], | |
[(None, shuffled_responses[2])], | |
[(None, shuffled_responses[3])], | |
) | |
def set_player_name(self, player_name, player_backend_user_id): | |
self.player_backend_user_id = player_backend_user_id | |
self.player_name = player_name | |
def set_background_url(self, background_url): | |
self.background_url = background_url | |
def set_player_backend_user_id(self, player_backend_user_id): | |
self.player_backend_user_id = player_backend_user_id | |
def set_player_selected_character(self, player_selected_character): | |
character_map = { | |
"露米娜": "0", | |
"索拉拉": "1", | |
"薇丹特": "2", | |
"蔚藍": "3", | |
} | |
self.player_selected_character = player_selected_character | |
self.player_selected_model = self.shuffled_response_order[ | |
character_map[player_selected_character] | |
] | |
self.player_selected_paragraph = self.get_paragraph_by_model( | |
self.player_selected_model | |
) | |
def get_paragraph_by_model(self, model): | |
return getattr(self, f"paragraph_{model}", None) | |
def create_certificate(self): | |
image_url = self.openai_agent.get_background() | |
self.set_background_url(image_url) | |
source_file = ImageProcessor.generate_reward( | |
image_url, | |
self.player_name, | |
self.player_selected_paragraph, | |
self.player_backend_user_id, | |
) | |
public_url = self.upload_blob_and_get_public_url( | |
"mes_completion_rewards", source_file, f"2023_mes/{source_file}" | |
) | |
self.player_certificate_url = public_url | |
return gr.Image(public_url, visible=True, elem_id="certificate") | |
def to_dict(self): | |
return { | |
"player_backend_user_id": self.player_backend_user_id, | |
"player_name": self.player_name, | |
"background_url": self.background_url, | |
"player_selected_model": self.player_selected_model, | |
"player_selected_paragraph": self.player_selected_paragraph, | |
"paragraph_openai": self.paragraph_openai, | |
"paragraph_aws": self.paragraph_aws, | |
"paragraph_google": self.paragraph_google, | |
"paragraph_mtk": self.paragraph_mtk, | |
"player_certificate_url": self.player_certificate_url, | |
"created_at_date": datetime.now().date(), | |
} | |
def insert_data_into_bigquery(self, client, dataset_id, table_id, rows_to_insert): | |
table_ref = client.dataset(dataset_id).table(table_id) | |
table = client.get_table(table_ref) | |
errors = client.insert_rows(table, rows_to_insert) | |
if errors: | |
logging.info("Errors occurred while inserting rows:") | |
for error in errors: | |
print(error) | |
else: | |
logging.info(f"Inserted {len(rows_to_insert)} rows successfully.") | |
def complete_reward( | |
self, | |
): | |
insert_row = self.to_dict() | |
self.insert_data_into_bigquery( | |
gbq_client, "streaming_log", "log_mes_completion_rewards", [insert_row] | |
) | |
logging.info( | |
f"Player {insert_row['player_backend_user_id']} rendered successfully." | |
) | |
with open("./data/completion_reward_issue_status.json") as f: | |
completion_reward_issue_status_dict = json.load(f) | |
completion_reward_issue_status_dict[ | |
insert_row["player_backend_user_id"] | |
] = self.player_certificate_url | |
with open("./data/completion_reward_issue_status.json", "w") as f: | |
json.dump(completion_reward_issue_status_dict, f) | |
def upload_blob_and_get_public_url( | |
self, bucket_name, source_file_name, destination_blob_name | |
): | |
"""Uploads a file to the bucket and makes it publicly accessible.""" | |
# Initialize a storage client | |
bucket = gcs_client.bucket(bucket_name) | |
blob = bucket.blob(destination_blob_name) | |
# Upload the file | |
blob.upload_from_filename(source_file_name) | |
# The public URL can be used to directly access the uploaded file via HTTP | |
public_url = blob.public_url | |
logging.info(f"File {source_file_name} uploaded to {destination_blob_name}.") | |
return public_url | |
class OpenAIAgent: | |
def __init__(self): | |
self.temperature = 0.8 | |
self.frequency_penalty = 0 | |
self.presence_penalty = 0 | |
self.max_tokens = 2048 | |
def get_story(self, user_log): | |
system_prompt = """ | |
我正在舉辦一個學習型的活動,我為學生設計了一個獨特的故事機制,每天每個學生都會收到屬於自己獨特的冒險紀錄,現在我需要你協助我將這些冒險紀錄,製作成一段冒險故事,請 | |
- 以「你」稱呼學生 | |
- 請用 500 字以內的短篇故事 | |
- 試著合併故事記錄成一段連貫、有吸引力的故事 | |
- 請使用 zh_TW | |
""" | |
user_log = f""" | |
```{user_log} | |
``` | |
""" | |
messages = [ | |
{ | |
"role": "system", | |
"content": f"{system_prompt}", | |
}, | |
{ | |
"role": "user", | |
"content": f"{user_log}", | |
}, | |
] | |
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
response = None | |
retry_attempts = 0 | |
while retry_attempts < 5: | |
try: | |
response = client.chat.completions.create( | |
model="gpt-3.5-turbo", | |
messages=messages, | |
temperature=self.temperature, | |
max_tokens=self.max_tokens, | |
frequency_penalty=self.frequency_penalty, | |
presence_penalty=self.presence_penalty, | |
) | |
chinese_converter = OpenCC("s2tw") | |
return chinese_converter.convert(response.choices[0].message.content) | |
except Exception as e: | |
retry_attempts += 1 | |
logging.error(f"OpenAI Attempt {retry_attempts}: {e}") | |
time.sleep(1 * retry_attempts) | |
def get_background(self): | |
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
image_url = None | |
retry_attempts = 0 | |
while retry_attempts < 5: | |
try: | |
logging.info("Generating image...") | |
response = client.images.generate( | |
model="dall-e-3", | |
prompt="Create an image in a retro Ghibli style, with a focus on a universe theme. The artwork should maintain the traditional hand-drawn animation look characteristic of Ghibli and with vibrant color. Imagine a scene set in outer space or a fantastical cosmic environment, rich with vibrant and varied color palettes to capture the mystery and majesty of the universe. The background should be detailed, showcasing stars, planets, and nebulae, blending the Ghibli style's nostalgia and emotional depth with the awe-inspiring aspects of space. The overall feel should be timeless, merging the natural wonder of the cosmos with the storytelling and emotional resonance typical of the retro Ghibli aesthetic. Soft lighting and gentle shading should be used to enhance the dreamlike, otherworldly quality of the scene.", | |
size="1024x1024", | |
quality="standard", | |
n=1, | |
) | |
image_url = response.data[0].url | |
return image_url | |
except Exception as e: | |
retry_attempts += 1 | |
logging.error(f"DALLE Attempt {retry_attempts}: {e}") | |
time.sleep(1 * retry_attempts) # exponential backoff | |
class AWSAgent: | |
def get_story(self, user_log): | |
system_prompt = """ | |
我正在舉辦一個學習型的活動,我為學生設計了一個獨特的故事機制,每天每個學生都會收到屬於自己獨特的冒險紀錄,現在我需要你協助我將這些冒險紀錄,製作成一段冒險故事,請 | |
- 以「你」稱呼學生 | |
- 請用 500 字以內的短篇故事 | |
- 試著合併故事記錄成一段連貫、有吸引力的故事 | |
- 請使用 zh_TW | |
""" | |
user_log = f""" | |
```{user_log} | |
``` | |
""" | |
client = AnthropicBedrock( | |
aws_access_key=os.getenv("AWS_ACCESS_KEY"), | |
aws_secret_key=os.getenv("AWS_SECRET_KEY"), | |
aws_region="us-west-2", | |
) | |
retry_attempts = 0 | |
while retry_attempts < 5: | |
try: | |
completion = client.completions.create( | |
model="anthropic.claude-v2", | |
max_tokens_to_sample=2048, | |
prompt=f"{anthropic_bedrock.HUMAN_PROMPT}{system_prompt},以下是我的故事紀錄```{user_log}``` {anthropic_bedrock.AI_PROMPT}", | |
) | |
chinese_converter = OpenCC("s2tw") | |
return chinese_converter.convert(completion.completion) | |
except Exception as e: | |
retry_attempts += 1 | |
logging.error(f"AWS Attempt {retry_attempts}: {e}") | |
time.sleep(1 * retry_attempts) | |
class GoogleAgent: | |
from google.cloud import aiplatform | |
from vertexai.preview.generative_models import GenerativeModel | |
SERVICE_ACCOUNT_INFO = os.getenv("GBQ_TOKEN") | |
service_account_info_dict = json.loads(SERVICE_ACCOUNT_INFO) | |
SCOPES = ["https://www.googleapis.com/auth/cloud-platform"] | |
creds = Credentials.from_service_account_info( | |
service_account_info_dict, scopes=SCOPES | |
) | |
aiplatform.init( | |
project="junyiacademy", | |
service_account=service_account_info_dict, | |
credentials=creds, | |
) | |
gemini_pro_model = GenerativeModel("gemini-pro") | |
def get_story(self, user_log): | |
system_prompt = """ | |
我正在舉辦一個學習型的活動,我為學生設計了一個獨特的故事機制,每天每個學生都會收到屬於自己獨特的冒險紀錄,現在我需要你協助我將這些冒險紀錄,製作成一段冒險故事,請 | |
- 以「你」稱呼學生 | |
- 請用 500 字以內的短篇故事 | |
- 試著合併故事記錄成一段連貫、有吸引力的故事 | |
- 請使用 zh_TW | |
""" | |
user_log = f""" | |
```{user_log} | |
``` | |
""" | |
retry_attempts = 0 | |
while retry_attempts < 5: | |
try: | |
logging.info("Google Generating response...") | |
model_response = self.gemini_pro_model.generate_content( | |
f"{system_prompt}, 以下是我的冒險故事 ```{user_log}```" | |
) | |
chinese_converter = OpenCC("s2tw") | |
return chinese_converter.convert( | |
model_response.candidates[0].content.parts[0].text | |
) | |
except Exception as e: | |
retry_attempts += 1 | |
logging.error(f"Google Attempt {retry_attempts}: {e}") | |
time.sleep(1 * retry_attempts) | |
class MTKAgent: | |
def get_story(self, user_log): | |
system_prompt = """ | |
我正在舉辦一個學習型的活動,我為學生設計了一個獨特的故事機制,每天每個學生都會收到屬於自己獨特的冒險紀錄,現在我需要你協助我將這些冒險紀錄,製作成一段冒險故事,請 | |
- 以「你」稱呼學生 | |
- 請用 500 字以內的短篇故事 | |
- 試著合併故事記錄成一段連貫、有吸引力的故事 | |
- 請使用 zh_TW | |
""" | |
user_log = f""" | |
```{user_log} | |
``` | |
""" | |
BASE_URL = "http://35.229.245.251:8008/v1" | |
TOKEN = os.getenv("MTK_TOKEN") | |
MODEL_NAME = "model7-c-chat" | |
TEMPERATURE = 1 | |
MAX_TOKENS = 1024 | |
TOP_P = 0 | |
PRESENCE_PENALTY = 0 | |
FREQUENCY_PENALTY = 0 | |
message = f"{system_prompt}, 以下是我的冒險故事 ```{user_log}```" | |
url = os.path.join(BASE_URL, "chat/completions") | |
headers = { | |
"accept": "application/json", | |
"Authorization": f"Bearer {TOKEN}", | |
"Content-Type": "application/json", | |
} | |
data = { | |
"model": MODEL_NAME, | |
"messages": str(message), | |
"temperature": TEMPERATURE, | |
"n": 1, | |
"max_tokens": MAX_TOKENS, | |
"stop": "", | |
"top_p": TOP_P, | |
"logprobs": 0, | |
"echo": False, | |
"presence_penalty": PRESENCE_PENALTY, | |
"frequency_penalty": FREQUENCY_PENALTY, | |
} | |
retry_attempts = 0 | |
while retry_attempts < 5: | |
try: | |
response = requests.post( | |
url, headers=headers, data=json.dumps(data) | |
).json() | |
response_text = response["choices"][0]["message"]["content"] | |
matched_content = re.search("```(.+)", response_text, re.DOTALL) | |
extracted_content = ( | |
matched_content.group(1).strip() if matched_content else None | |
) | |
chinese_converter = OpenCC("s2tw") | |
if extracted_content: | |
return chinese_converter.convert(extracted_content) | |
else: | |
return chinese_converter.convert(response_text) | |
except Exception as e: | |
retry_attempts += 1 | |
logging.error(f"MTK Attempt {retry_attempts}: {e}") | |
time.sleep(1 * retry_attempts) | |
class ImageProcessor: | |
def draw_shadow( | |
image, box, radius, offset=(10, 10), shadow_color=(0, 0, 0, 128), blur_radius=5 | |
): | |
shadow_image = Image.new("RGBA", image.size, (0, 0, 0, 0)) | |
shadow_draw = ImageDraw.Draw(shadow_image) | |
shadow_box = [ | |
box[0] + offset[0], | |
box[1] + offset[1], | |
box[2] + offset[0], | |
box[3] + offset[1], | |
] | |
shadow_draw.rounded_rectangle(shadow_box, fill=shadow_color, radius=radius) | |
shadow_image = shadow_image.filter(ImageFilter.GaussianBlur(blur_radius)) | |
image.paste(shadow_image, (0, 0), shadow_image) | |
def generate_reward(url, player_name, paragraph, player_backend_user_id): | |
retry_attempts = 0 | |
while retry_attempts < 5: | |
try: | |
response = requests.get(url) | |
break | |
except requests.RequestException as e: | |
retry_attempts += 1 | |
logging.error(f"Attempt {retry_attempts}: {e}") | |
time.sleep(1 * retry_attempts) # exponential backoff | |
image_bytes = io.BytesIO(response.content) | |
img = Image.open(image_bytes) | |
tmp_img = Image.new("RGBA", img.size, (0, 0, 0, 0)) | |
draw = ImageDraw.Draw(tmp_img) | |
# Draw the box | |
left, right = 50, img.width - 50 | |
box_height = 600 | |
top = (img.height - box_height) // 2 | |
bottom = (img.height + box_height) // 2 | |
border_radius = 20 | |
# Draw the rounded rectangle | |
fill_color = (255, 255, 255, 200) | |
draw.rounded_rectangle( | |
[left, top, right, bottom], | |
fill=fill_color, | |
outline=None, | |
radius=border_radius, | |
) | |
img.paste(Image.alpha_composite(img.convert("RGBA"), tmp_img), (0, 0), tmp_img) | |
draw = ImageDraw.Draw(img) | |
# Draw the text | |
title_font = ImageFont.truetype("NotoSansTC-Bold.ttf", 34) | |
body_font = ImageFont.truetype("NotoSansTC-Light.ttf", 12) | |
# Title text | |
title = f"光束守護者 - {player_name} 的冒險故事" | |
title_x, title_y = left + 20, top + 20 # Adjust padding as needed | |
draw.text((title_x, title_y), title, font=title_font, fill="black") | |
# Paragraph text with newlines | |
body_x, body_y = left + 20, title_y + 60 # Adjust position as needed | |
for line in paragraph.split("\n"): | |
wrapped_lines = textwrap.wrap(line, width=73) | |
for wrapped_line in wrapped_lines: | |
draw.text((body_x, body_y), wrapped_line, font=body_font, fill="black") | |
body_y += 30 | |
# Save the image with the text | |
def get_md5_hash(text): | |
return hashlib.md5(text.encode("utf-8")).hexdigest() | |
updated_image_path = f"certificate_{get_md5_hash(player_backend_user_id)}.png" | |
img.save(updated_image_path) | |
return updated_image_path | |