|
import pandas as pd |
|
import os |
|
import gradio as gr |
|
import threading |
|
import time |
|
from groq import Groq |
|
|
|
|
|
client = Groq() |
|
|
|
|
|
MAX_SIZE = 1.1 * 1024 * 1024 * 1024 |
|
DATA_DIRECTORY = 'data' |
|
UPDATE_INTERVAL = 1 |
|
|
|
|
|
os.makedirs(DATA_DIRECTORY, exist_ok=True) |
|
|
|
|
|
file_index = 1 |
|
current_file = os.path.join(DATA_DIRECTORY, f'data{file_index}.csv') |
|
file_paths = [current_file] |
|
combined_tokens = 0 |
|
|
|
|
|
def get_file_size(filename): |
|
return os.path.getsize(filename) if os.path.isfile(filename) else 0 |
|
|
|
|
|
def generate_and_save_data(): |
|
global file_index, current_file, file_paths, combined_tokens |
|
|
|
|
|
if not os.path.isfile(current_file): |
|
pd.DataFrame(columns=["prompt", "response"]).to_csv(current_file, index=False) |
|
|
|
while True: |
|
try: |
|
|
|
completion = client.chat.completions.create( |
|
model="gemma2-9b-it", |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": "give me a single prompt to prompt an ai model, simulating what users could want from you. ensure that it is diverse and high quality. for each, choose a random writing style (though it has to be a common one), random length and random clarity of the prompt. ensure that it is a single prompt, and just the prompt itself, nothing else. eg, don't close the prompt in quotation marks or say Here is a single prompt that meets your requirements or anything similar to that" |
|
} |
|
], |
|
temperature=1, |
|
max_tokens=1024, |
|
top_p=1, |
|
stream=True, |
|
stop=None, |
|
) |
|
|
|
prompt = "" |
|
prompt_tokens = 0 |
|
for chunk in completion: |
|
content = chunk.choices[0].delta.content |
|
if content: |
|
prompt += content |
|
prompt_tokens += len(content.split()) |
|
|
|
|
|
second_completion = client.chat.completions.create( |
|
model="gemma2-9b-it", |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": prompt |
|
} |
|
], |
|
temperature=1, |
|
max_tokens=5000, |
|
top_p=1, |
|
stream=True, |
|
stop=None, |
|
) |
|
|
|
response = "" |
|
response_tokens = 0 |
|
for chunk in second_completion: |
|
content = chunk.choices[0].delta.content |
|
if content: |
|
response += content |
|
response_tokens += len(content.split()) |
|
|
|
|
|
combined_tokens += (prompt_tokens + response_tokens) |
|
|
|
|
|
print("Generated prompt:", prompt) |
|
print("Response to the generated prompt:", response) |
|
|
|
|
|
data = pd.DataFrame({"prompt": [prompt], "response": [response]}) |
|
|
|
|
|
if get_file_size(current_file) >= MAX_SIZE: |
|
file_index += 1 |
|
current_file = os.path.join(DATA_DIRECTORY, f'data{file_index}.csv') |
|
file_paths.append(current_file) |
|
|
|
with open(current_file, 'w') as f: |
|
data.to_csv(f, header=True, index=False) |
|
else: |
|
|
|
with open(current_file, 'a') as f: |
|
data.to_csv(f, header=False, index=False) |
|
|
|
|
|
time.sleep(UPDATE_INTERVAL) |
|
|
|
except Exception as e: |
|
print(f"An error occurred: {e}. Retrying in 5 seconds...") |
|
time.sleep(5) |
|
|
|
|
|
def get_available_files(): |
|
return [f for f in file_paths if os.path.isfile(f)] |
|
|
|
|
|
def update_file_list(): |
|
return gr.update(choices=get_available_files()) |
|
|
|
|
|
def update_token_count(): |
|
return combined_tokens |
|
|
|
|
|
def display_file_content(selected_file): |
|
if selected_file: |
|
return pd.read_csv(selected_file) |
|
return pd.DataFrame() |
|
|
|
|
|
thread = threading.Thread(target=generate_and_save_data) |
|
thread.daemon = True |
|
thread.start() |
|
|
|
|
|
with gr.Blocks() as app: |
|
gr.Markdown("## AI Prompt and Response Generator") |
|
gr.Markdown("This app continuously generates AI prompts and responses, and writes them to CSV files.") |
|
|
|
file_selector = gr.Dropdown(label="Select a data file to view and download", choices=get_available_files()) |
|
file_viewer = gr.DataFrame(label="CSV File Content") |
|
download_button = gr.File(label="Download Selected File") |
|
|
|
def download_file(selected_file): |
|
return selected_file |
|
|
|
refresh_button = gr.Button("Refresh File List") |
|
refresh_button.click(update_file_list, outputs=file_selector) |
|
file_selector.change(display_file_content, inputs=file_selector, outputs=file_viewer) |
|
file_selector.change(download_file, inputs=file_selector, outputs=download_button) |
|
|
|
token_display = gr.Textbox(label="Combined Tokens", value=str(update_token_count()), interactive=False) |
|
|
|
def update_token_display(): |
|
return str(update_token_count()) |
|
|
|
|
|
token_refresh = gr.Button("Refresh Token Count") |
|
token_refresh.click(update_token_display, outputs=token_display) |
|
|
|
app.launch() |
|
|