Spaces:
Sleeping
Sleeping
import streamlit as st | |
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification | |
import json | |
import os | |
import requests | |
import torch | |
from gensim import summarize, corpora, models, dictionary | |
import re | |
from pygments import highlight | |
from pygments.lexers import PythonLexer | |
from pygments.formatters import HtmlFormatter | |
import sys | |
import time | |
from threading import Thread | |
import subprocess | |
import collections.abc as collections | |
client = InferenceClient( | |
"mistralai/Mixtral-8x7B-Instruct-v0.1" | |
) | |
def format_prompt(message, history): | |
prompt = "<s>" | |
for user_prompt, bot_response in history: | |
prompt += f"[INST] {user_prompt} [/INST]" | |
prompt += f" {bot_response}</s> " | |
prompt += f"[INST] {message} [/INST]" | |
return prompt | |
# --- Constants --- | |
MODEL_URL = "https://huggingface.co/models" | |
TASKS_FILE = "tasks.json" | |
CODE_EXECUTION_ENV = {} | |
PIPELINE_RUNNING = False | |
# --- Model Initialization --- | |
generator = pipeline('text-generation', model='EleutherAI/gpt-neo-2.7B') | |
sentiment_model_name = "distilbert-base-uncased-finetuned-sst-2-english" | |
sentiment_tokenizer = AutoTokenizer.from_pretrained(sentiment_model_name) | |
sentiment_model = AutoModelForSequenceClassification.from_pretrained(sentiment_model_name) | |
# --- Helper Functions --- | |
def generate_code(prompt): | |
"""Generates code based on the given prompt.""" | |
generated = generator(prompt, max_length=200, do_sample=True, temperature=0.9) | |
return generated[0]['generated_text'] | |
def add_task(task_description): | |
"""Adds a new task to the task list.""" | |
try: | |
with open(TASKS_FILE, "r") as outfile: | |
tasks = json.load(outfile) | |
except FileNotFoundError: | |
tasks = [] | |
tasks.append({"task": task_description["task"], "description": task_description["description"], "status": "Pending"}) | |
with open(TASKS_FILE, "w") as outfile: | |
json.dump(tasks, outfile) | |
def display_code(code): | |
"""Displays the code in a formatted manner.""" | |
formatter = HtmlFormatter(style='default') | |
lexer = PythonLexer() | |
html = highlight(code, lexer, formatter) | |
st.markdown(html, unsafe_allow_html=True) | |
def summarize_text(text): | |
"""Summarizes the given text.""" | |
return summarize(text) | |
def analyze_sentiment(text): | |
"""Analyzes the sentiment of the given text.""" | |
inputs = sentiment_tokenizer(text, return_tensors='pt') | |
outputs = sentiment_model(**inputs) | |
probs = torch.nn.functional.softmax(outputs.logits, dim=1) | |
return probs.tolist()[0][1] | |
def run_tests(code): | |
"""Runs tests on the given code.""" | |
# Placeholder for testing logic | |
return "Tests passed." | |
def load_model(model_name): | |
"""Loads a pre-trained model.""" | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModelForSequenceClassification.from_pretrained(model_name) | |
return model, tokenizer | |
def save_model(model, tokenizer, file_name): | |
"""Saves the model and tokenizer.""" | |
model.save_pretrained(file_name) | |
tokenizer.save_pretrained(file_name) | |
def load_dataset(file_name): | |
"""Loads a dataset from a file.""" | |
data = [] | |
with open(file_name, "r") as infile: | |
for line in infile: | |
data.append(line.strip()) | |
return data | |
def save_dataset(data, file_name): | |
"""Saves a dataset to a file.""" | |
with open(file_name, "w") as outfile: | |
for item in data: | |
outfile.write("%s\n" % item) | |
def download_file(url, file_name): | |
"""Downloads a file from a URL.""" | |
response = requests.get(url) | |
if response.status_code == 200: | |
with open(file_name, "wb") as outfile: | |
outfile.write(response.content) | |
def get_model_list(): | |
"""Gets a list of available models.""" | |
response = requests.get(MODEL_URL) | |
models = [] | |
for match in re.finditer("<a href='/models/(\w+/\w+)'", response.text): | |
models.append(match.group(1)) | |
return models | |
def predict_text(model, tokenizer, text): | |
"""Predicts the text using the given model and tokenizer.""" | |
inputs = tokenizer(text, return_tensors='pt') | |
outputs = model(**inputs) | |
probs = torch.nn.functional.softmax(outputs.logits, dim=1) | |
return probs.tolist()[0] | |
def get_user_input(): | |
"""Gets user input.""" | |
input_type = st.selectbox("Select an input type", ["Text", "File", "Model"]) | |
if input_type == "Text": | |
prompt = st.text_input("Enter text:") | |
return prompt | |
elif input_type == "File": | |
uploaded_file = st.file_uploader("Choose a file") | |
if uploaded_file: | |
return uploaded_file.read().decode("utf-8") | |
else: | |
return "" | |
elif input_type == "Model": | |
model_name = st.selectbox("Select a model", get_model_list()) | |
model, tokenizer = load_model(model_name) | |
text = st.text_area("Enter text:") | |
return text | |
def get_tasks(): | |
"""Loads tasks from tasks.json.""" | |
try: | |
with open(TASKS_FILE, "r") as outfile: | |
tasks = json.load(outfile) | |
return tasks | |
except FileNotFoundError: | |
return [] | |
def complete_task(task_id): | |
"""Completes a task.""" | |
tasks = get_tasks() | |
if 0 <= task_id < len(tasks): | |
tasks[task_id]["status"] = "Completed" | |
with open(TASKS_FILE, "w") as outfile: | |
json.dump(tasks, outfile) | |
st.write(f"Task {task_id} completed.") | |
else: | |
st.write(f"Invalid task ID: {task_id}") | |
def delete_task(task_id): | |
"""Deletes a task.""" | |
tasks = get_tasks() | |
if 0 <= task_id < len(tasks): | |
del tasks[task_id] | |
with open(TASKS_FILE, "w") as outfile: | |
json.dump(tasks, outfile) | |
st.write(f"Task {task_id} deleted.") | |
else: | |
st.write(f"Invalid task ID: {task_id}") | |
def run_pipeline(): | |
"""Runs the pipeline.""" | |
global PIPELINE_RUNNING | |
PIPELINE_RUNNING = True | |
while PIPELINE_RUNNING: | |
tasks = get_tasks() | |
for i, task in enumerate(tasks): | |
if task["status"] == "Pending": | |
st.write(f"Processing task {i}: {task['task']}") | |
try: | |
code = generate_code(task['description']) | |
st.write(f"Generated code:\n{code}") | |
# Execute code in a separate process | |
process = subprocess.Popen(["python", "-c", code], stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
output, error = process.communicate() | |
st.write(f"Code output:\n{output.decode('utf-8')}") | |
st.write(f"Code error:\n{error.decode('utf-8')}") | |
# Run tests (replace with actual logic) | |
test_result = run_tests(code) | |
st.write(f"Test result: {test_result}") | |
# Update task status | |
tasks[i]["status"] = "Completed" | |
with open(TASKS_FILE, "w") as outfile: | |
json.dump(tasks, outfile) | |
except Exception as e: | |
st.write(f"Error processing task {i}: {e}") | |
tasks[i]["status"] = "Failed" | |
with open(TASKS_FILE, "w") as outfile: | |
json.dump(tasks, outfile) | |
time.sleep(1) # Adjust delay as needed | |
def stop_pipeline(): | |
"""Stops the pipeline.""" | |
global PIPELINE_RUNNING | |
PIPELINE_RUNNING = False | |
st.write("Pipeline stopped.") | |
def load_model(file_name): | |
"""Loads a saved model.""" | |
try: | |
with open(file_name, "rb") as f: | |
model = pickle.load(f) | |
with open(file_name.replace(".sav", "_tokenizer.pkl"), "rb") as f: | |
tokenizer = pickle.load(f) | |
return model, tokenizer | |
except FileNotFoundError: | |
st.write(f"Model not found: {file_name}") | |
return None, None | |
def delete_model(file_name): | |
"""Deletes a saved model.""" | |
try: | |
os.remove(file_name) | |
os.remove(file_name.replace(".sav", "_tokenizer.pkl")) | |
st.write(f"Model deleted: {file_name}") | |
except FileNotFoundError: | |
st.write(f"Model not found: {file_name}") | |
# --- Streamlit App --- | |
def main(): | |
"""Main function.""" | |
st.title("AI-Powered Code Interpreter") | |
# --- Code Generation and Analysis --- | |
st.subheader("Code Generation and Analysis") | |
text = get_user_input() | |
if text: | |
prompt = "Generate a python function that:\n\n" + text | |
code = generate_code(prompt) | |
summarized_text = "" | |
if len(text) > 100: | |
summarized_text = summarize_text(text) | |
sentiment = "" | |
if text: | |
sentiment = "Positive" if analyze_sentiment(text) > 0.5 else "Negative" | |
tests_passed = "" | |
if code: | |
tests_passed = run_tests(code) | |
st.subheader("Summary:") | |
st.write(summarized_text) | |
st.subheader("Sentiment:") | |
st.write(sentiment) | |
st.subheader("Code:") | |
display_code(code) | |
st.subheader("Tests:") | |
st.write(tests_passed) | |
if st.button("Save code"): | |
file_name = st.text_input("Enter file name:") | |
with open(file_name, "w") as outfile: | |
outfile.write(code) | |
# --- Dataset Management --- | |
st.subheader("Dataset Management") | |
if st.button("Load dataset"): | |
file_name = st.text_input("Enter file name:") | |
data = load_dataset(file_name) | |
st.write(data) | |
if st.button("Save dataset"): | |
data = st.text_area("Enter data:") | |
file_name = st.text_input("Enter file name:") | |
save_dataset(data, file_name) | |
# --- Model Management --- | |
st.subheader("Model Management") | |
if st.button("Download model"): | |
model_name = st.selectbox("Select a model", get_model_list()) | |
url = f"{MODEL_URL}/models/{model_name}/download" | |
file_name = model_name.replace("/", "-") + ".tar.gz" | |
download_file(url, file_name) | |
if st.button("Load model"): | |
model_name = st.selectbox("Select a model", get_model_list()) | |
model, tokenizer = load_model(model_name) | |
if st.button("Predict text"): | |
text = st.text_area("Enter text:") | |
probs = predict_text(model, tokenizer, text) | |
st.write(probs) | |
if st.button("Save model"): | |
file_name = st.text_input("Enter file name:") | |
save_model(model, tokenizer, file_name) | |
# --- Saved Model Management --- | |
st.subheader("Saved Model Management") | |
file_name = st.text_input("Enter file name:") | |
model, tokenizer = load_model(file_name) | |
if st.button("Delete model"): | |
delete_model(file_name) | |
# --- Task Management --- | |
st.subheader("Task Management") | |
if st.button("Add task"): | |
task = st.text_input("Enter task:") | |
description = st.text_area("Enter description:") | |
add_task({"task": task, "description": description}) | |
if st.button("Show tasks"): | |
tasks = get_tasks() | |
st.write(tasks) | |
if st.button("Complete task"): | |
task_id = st.number_input("Enter task ID:") | |
complete_task(task_id) | |
if st.button("Delete task"): | |
task_id = st.number_input("Enter task ID:") | |
delete_task(task_id) | |
# --- Pipeline Management --- | |
st.subheader("Pipeline Management") | |
if st.button("Run pipeline") and not PIPELINE_RUNNING: | |
Thread(target=run_pipeline).start() | |
if st.button("Stop pipeline") and PIPELINE_RUNNING: | |
stop_pipeline() | |
# --- Console Management --- | |
st.subheader("Console Management") | |
if st.button("Clear console"): | |
st.write("") | |
if st.button("Quit"): | |
sys.exit() | |
if __name__ == "__main__": | |
main() |