|
import json |
|
import logging |
|
import re |
|
import requests |
|
from typing import List |
|
|
|
import g4f |
|
from loguru import logger |
|
from openai import AzureOpenAI, OpenAI |
|
from openai.types.chat import ChatCompletion |
|
|
|
from app.config import config |
|
|
|
_max_retries = 5 |
|
|
|
|
|
def _generate_response(prompt: str) -> str: |
|
try: |
|
content = "" |
|
llm_provider = config.app.get("llm_provider", "openai") |
|
logger.info(f"llm provider: {llm_provider}") |
|
if llm_provider == "g4f": |
|
model_name = config.app.get("g4f_model_name", "") |
|
if not model_name: |
|
model_name = "gpt-3.5-turbo-16k-0613" |
|
content = g4f.ChatCompletion.create( |
|
model=model_name, |
|
messages=[{"role": "user", "content": prompt}], |
|
) |
|
else: |
|
api_version = "" |
|
if llm_provider == "moonshot": |
|
api_key = config.app.get("moonshot_api_key") |
|
model_name = config.app.get("moonshot_model_name") |
|
base_url = "https://api.moonshot.cn/v1" |
|
elif llm_provider == "ollama": |
|
|
|
api_key = "ollama" |
|
model_name = config.app.get("ollama_model_name") |
|
base_url = config.app.get("ollama_base_url", "") |
|
if not base_url: |
|
base_url = "http://localhost:11434/v1" |
|
elif llm_provider == "openai": |
|
api_key = config.app.get("openai_api_key") |
|
model_name = config.app.get("openai_model_name") |
|
base_url = config.app.get("openai_base_url", "") |
|
if not base_url: |
|
base_url = "https://api.openai.com/v1" |
|
elif llm_provider == "oneapi": |
|
api_key = config.app.get("oneapi_api_key") |
|
model_name = config.app.get("oneapi_model_name") |
|
base_url = config.app.get("oneapi_base_url", "") |
|
elif llm_provider == "azure": |
|
api_key = config.app.get("azure_api_key") |
|
model_name = config.app.get("azure_model_name") |
|
base_url = config.app.get("azure_base_url", "") |
|
api_version = config.app.get("azure_api_version", "2024-02-15-preview") |
|
elif llm_provider == "gemini": |
|
api_key = config.app.get("gemini_api_key") |
|
model_name = config.app.get("gemini_model_name") |
|
base_url = "***" |
|
elif llm_provider == "qwen": |
|
api_key = config.app.get("qwen_api_key") |
|
model_name = config.app.get("qwen_model_name") |
|
base_url = "***" |
|
elif llm_provider == "cloudflare": |
|
api_key = config.app.get("cloudflare_api_key") |
|
model_name = config.app.get("cloudflare_model_name") |
|
account_id = config.app.get("cloudflare_account_id") |
|
base_url = "***" |
|
elif llm_provider == "deepseek": |
|
api_key = config.app.get("deepseek_api_key") |
|
model_name = config.app.get("deepseek_model_name") |
|
base_url = config.app.get("deepseek_base_url") |
|
if not base_url: |
|
base_url = "https://api.deepseek.com" |
|
elif llm_provider == "ernie": |
|
api_key = config.app.get("ernie_api_key") |
|
secret_key = config.app.get("ernie_secret_key") |
|
base_url = config.app.get("ernie_base_url") |
|
model_name = "***" |
|
if not secret_key: |
|
raise ValueError( |
|
f"{llm_provider}: secret_key is not set, please set it in the config.toml file." |
|
) |
|
elif llm_provider == "pollinations": |
|
try: |
|
base_url = config.app.get("pollinations_base_url", "") |
|
if not base_url: |
|
base_url = "https://text.pollinations.ai/openai" |
|
model_name = config.app.get("pollinations_model_name", "openai-fast") |
|
|
|
|
|
payload = { |
|
"model": model_name, |
|
"messages": [ |
|
{"role": "user", "content": prompt} |
|
], |
|
"seed": 101 |
|
} |
|
|
|
|
|
if config.app.get("pollinations_private"): |
|
payload["private"] = True |
|
if config.app.get("pollinations_referrer"): |
|
payload["referrer"] = config.app.get("pollinations_referrer") |
|
|
|
headers = { |
|
"Content-Type": "application/json" |
|
} |
|
|
|
|
|
response = requests.post(base_url, headers=headers, json=payload) |
|
response.raise_for_status() |
|
result = response.json() |
|
|
|
if result and "choices" in result and len(result["choices"]) > 0: |
|
content = result["choices"][0]["message"]["content"] |
|
return content.replace("\n", "") |
|
else: |
|
raise Exception(f"[{llm_provider}] returned an invalid response format") |
|
|
|
except requests.exceptions.RequestException as e: |
|
raise Exception(f"[{llm_provider}] request failed: {str(e)}") |
|
except Exception as e: |
|
raise Exception(f"[{llm_provider}] error: {str(e)}") |
|
|
|
if llm_provider not in ["pollinations", "ollama"]: |
|
if not api_key: |
|
raise ValueError( |
|
f"{llm_provider}: api_key is not set, please set it in the config.toml file." |
|
) |
|
if not model_name: |
|
raise ValueError( |
|
f"{llm_provider}: model_name is not set, please set it in the config.toml file." |
|
) |
|
if not base_url: |
|
raise ValueError( |
|
f"{llm_provider}: base_url is not set, please set it in the config.toml file." |
|
) |
|
|
|
if llm_provider == "qwen": |
|
import dashscope |
|
from dashscope.api_entities.dashscope_response import GenerationResponse |
|
|
|
dashscope.api_key = api_key |
|
response = dashscope.Generation.call( |
|
model=model_name, messages=[{"role": "user", "content": prompt}] |
|
) |
|
if response: |
|
if isinstance(response, GenerationResponse): |
|
status_code = response.status_code |
|
if status_code != 200: |
|
raise Exception( |
|
f'[{llm_provider}] returned an error response: "{response}"' |
|
) |
|
|
|
content = response["output"]["text"] |
|
return content.replace("\n", "") |
|
else: |
|
raise Exception( |
|
f'[{llm_provider}] returned an invalid response: "{response}"' |
|
) |
|
else: |
|
raise Exception(f"[{llm_provider}] returned an empty response") |
|
|
|
if llm_provider == "gemini": |
|
import google.generativeai as genai |
|
|
|
genai.configure(api_key=api_key, transport="rest") |
|
|
|
generation_config = { |
|
"temperature": 0.5, |
|
"top_p": 1, |
|
"top_k": 1, |
|
"max_output_tokens": 2048, |
|
} |
|
|
|
safety_settings = [ |
|
{ |
|
"category": "HARM_CATEGORY_HARASSMENT", |
|
"threshold": "BLOCK_ONLY_HIGH", |
|
}, |
|
{ |
|
"category": "HARM_CATEGORY_HATE_SPEECH", |
|
"threshold": "BLOCK_ONLY_HIGH", |
|
}, |
|
{ |
|
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", |
|
"threshold": "BLOCK_ONLY_HIGH", |
|
}, |
|
{ |
|
"category": "HARM_CATEGORY_DANGEROUS_CONTENT", |
|
"threshold": "BLOCK_ONLY_HIGH", |
|
}, |
|
] |
|
|
|
model = genai.GenerativeModel( |
|
model_name=model_name, |
|
generation_config=generation_config, |
|
safety_settings=safety_settings, |
|
) |
|
|
|
try: |
|
response = model.generate_content(prompt) |
|
candidates = response.candidates |
|
generated_text = candidates[0].content.parts[0].text |
|
except (AttributeError, IndexError) as e: |
|
print("Gemini Error:", e) |
|
|
|
return generated_text |
|
|
|
if llm_provider == "cloudflare": |
|
response = requests.post( |
|
f"https://api.cloudflare.com/client/v4/accounts/{account_id}/ai/run/{model_name}", |
|
headers={"Authorization": f"Bearer {api_key}"}, |
|
json={ |
|
"messages": [ |
|
{ |
|
"role": "system", |
|
"content": "You are a friendly assistant", |
|
}, |
|
{"role": "user", "content": prompt}, |
|
] |
|
}, |
|
) |
|
result = response.json() |
|
logger.info(result) |
|
return result["result"]["response"] |
|
|
|
if llm_provider == "ernie": |
|
response = requests.post( |
|
"https://aip.baidubce.com/oauth/2.0/token", |
|
params={ |
|
"grant_type": "client_credentials", |
|
"client_id": api_key, |
|
"client_secret": secret_key, |
|
} |
|
) |
|
access_token = response.json().get("access_token") |
|
url = f"{base_url}?access_token={access_token}" |
|
|
|
payload = json.dumps( |
|
{ |
|
"messages": [{"role": "user", "content": prompt}], |
|
"temperature": 0.5, |
|
"top_p": 0.8, |
|
"penalty_score": 1, |
|
"disable_search": False, |
|
"enable_citation": False, |
|
"response_format": "text", |
|
} |
|
) |
|
headers = {"Content-Type": "application/json"} |
|
|
|
response = requests.request( |
|
"POST", url, headers=headers, data=payload |
|
).json() |
|
return response.get("result") |
|
|
|
if llm_provider == "azure": |
|
client = AzureOpenAI( |
|
api_key=api_key, |
|
api_version=api_version, |
|
azure_endpoint=base_url, |
|
) |
|
else: |
|
client = OpenAI( |
|
api_key=api_key, |
|
base_url=base_url, |
|
) |
|
|
|
response = client.chat.completions.create( |
|
model=model_name, messages=[{"role": "user", "content": prompt}] |
|
) |
|
if response: |
|
if isinstance(response, ChatCompletion): |
|
content = response.choices[0].message.content |
|
else: |
|
raise Exception( |
|
f'[{llm_provider}] returned an invalid response: "{response}", please check your network ' |
|
f"connection and try again." |
|
) |
|
else: |
|
raise Exception( |
|
f"[{llm_provider}] returned an empty response, please check your network connection and try again." |
|
) |
|
|
|
return content.replace("\n", "") |
|
except Exception as e: |
|
return f"Error: {str(e)}" |
|
|
|
|
|
def generate_script( |
|
video_subject: str, language: str = "", paragraph_number: int = 1 |
|
) -> str: |
|
prompt = f""" |
|
# Role: Video Script Generator |
|
|
|
## Goals: |
|
Generate a script for a video, depending on the subject of the video. |
|
|
|
## Constrains: |
|
1. the script is to be returned as a string with the specified number of paragraphs. |
|
2. do not under any circumstance reference this prompt in your response. |
|
3. get straight to the point, don't start with unnecessary things like, "welcome to this video". |
|
4. you must not include any type of markdown or formatting in the script, never use a title. |
|
5. only return the raw content of the script. |
|
6. do not include "voiceover", "narrator" or similar indicators of what should be spoken at the beginning of each paragraph or line. |
|
7. you must not mention the prompt, or anything about the script itself. also, never talk about the amount of paragraphs or lines. just write the script. |
|
8. respond in the same language as the video subject. |
|
|
|
# Initialization: |
|
- video subject: {video_subject} |
|
- number of paragraphs: {paragraph_number} |
|
""".strip() |
|
if language: |
|
prompt += f"\n- language: {language}" |
|
|
|
final_script = "" |
|
logger.info(f"subject: {video_subject}") |
|
|
|
def format_response(response): |
|
|
|
|
|
response = response.replace("*", "") |
|
response = response.replace("#", "") |
|
|
|
|
|
response = re.sub(r"\[.*\]", "", response) |
|
response = re.sub(r"\(.*\)", "", response) |
|
|
|
|
|
paragraphs = response.split("\n\n") |
|
|
|
|
|
|
|
|
|
|
|
return "\n\n".join(paragraphs) |
|
|
|
for i in range(_max_retries): |
|
try: |
|
response = _generate_response(prompt=prompt) |
|
if response: |
|
final_script = format_response(response) |
|
else: |
|
logging.error("gpt returned an empty response") |
|
|
|
|
|
if final_script and "当日额度已消耗完" in final_script: |
|
raise ValueError(final_script) |
|
|
|
if final_script: |
|
break |
|
except Exception as e: |
|
logger.error(f"failed to generate script: {e}") |
|
|
|
if i < _max_retries: |
|
logger.warning(f"failed to generate video script, trying again... {i + 1}") |
|
if "Error: " in final_script: |
|
logger.error(f"failed to generate video script: {final_script}") |
|
else: |
|
logger.success(f"completed: \n{final_script}") |
|
return final_script.strip() |
|
|
|
|
|
def generate_terms(video_subject: str, video_script: str, amount: int = 5) -> List[str]: |
|
prompt = f""" |
|
# Role: Video Search Terms Generator |
|
|
|
## Goals: |
|
Generate {amount} search terms for stock videos, depending on the subject of a video. |
|
|
|
## Constrains: |
|
1. the search terms are to be returned as a json-array of strings. |
|
2. each search term should consist of 1-3 words, always add the main subject of the video. |
|
3. you must only return the json-array of strings. you must not return anything else. you must not return the script. |
|
4. the search terms must be related to the subject of the video. |
|
5. reply with english search terms only. |
|
|
|
## Output Example: |
|
["search term 1", "search term 2", "search term 3","search term 4","search term 5"] |
|
|
|
## Context: |
|
### Video Subject |
|
{video_subject} |
|
|
|
### Video Script |
|
{video_script} |
|
|
|
Please note that you must use English for generating video search terms; Chinese is not accepted. |
|
""".strip() |
|
|
|
logger.info(f"subject: {video_subject}") |
|
|
|
search_terms = [] |
|
response = "" |
|
for i in range(_max_retries): |
|
try: |
|
response = _generate_response(prompt) |
|
if "Error: " in response: |
|
logger.error(f"failed to generate video script: {response}") |
|
return response |
|
search_terms = json.loads(response) |
|
if not isinstance(search_terms, list) or not all( |
|
isinstance(term, str) for term in search_terms |
|
): |
|
logger.error("response is not a list of strings.") |
|
continue |
|
|
|
except Exception as e: |
|
logger.warning(f"failed to generate video terms: {str(e)}") |
|
if response: |
|
match = re.search(r"\[.*]", response) |
|
if match: |
|
try: |
|
search_terms = json.loads(match.group()) |
|
except Exception as e: |
|
logger.warning(f"failed to generate video terms: {str(e)}") |
|
pass |
|
|
|
if search_terms and len(search_terms) > 0: |
|
break |
|
if i < _max_retries: |
|
logger.warning(f"failed to generate video terms, trying again... {i + 1}") |
|
|
|
logger.success(f"completed: \n{search_terms}") |
|
return search_terms |
|
|
|
|
|
if __name__ == "__main__": |
|
video_subject = "生命的意义是什么" |
|
script = generate_script( |
|
video_subject=video_subject, language="zh-CN", paragraph_number=1 |
|
) |
|
print("######################") |
|
print(script) |
|
search_terms = generate_terms( |
|
video_subject=video_subject, video_script=script, amount=5 |
|
) |
|
print("######################") |
|
print(search_terms) |
|
|