QA / app.py
malvika2003's picture
Update app.py
d9d1031 verified
raw
history blame contribute delete
No virus
12 kB
import os
from transformers import AutoTokenizer, AutoConfig
from optimum.intel.openvino import OVModelForCausalLM
from generation_utils import run_generation, estimate_latency, reset_textbox,get_special_token_id
from config import SUPPORTED_LLM_MODELS
import gradio as gr
from threading import Thread
from time import perf_counter
from typing import List
from transformers import AutoTokenizer, TextIteratorStreamer
import numpy as np
import os
from flask import Flask, render_template, redirect, url_for, request, flash
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from werkzeug.security import generate_password_hash, check_password_hash
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db'
db = SQLAlchemy(app)
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
def __repr__(self):
return '<User %r>' % self.username
# Create the database tables
with app.app_context():
db.create_all()
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
@app.route('/signup', methods=['GET', 'POST'])
def signup():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
hashed_password = generate_password_hash(password, method='sha256')
new_user = User(username=username, password=hashed_password)
db.session.add(new_user)
db.session.commit()
flash('Signup successful!', 'success')
return redirect(url_for('login'))
return render_template('signup.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
user = User.query.filter_by(username=username).first()
if user and check_password_hash(user.password, password):
login_user(user)
return redirect(url_for('dashboard'))
flash('Invalid username or password', 'danger')
return render_template('login.html')
@app.route('/dashboard')
@login_required
def dashboard():
return render_template('dashboard.html', name=current_user.username)
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('login'))
if __name__ == '__main__':
app.run(debug=True)
model_dir = "C:/Users/KIIT/OneDrive/Desktop/INTEL/phi-2/INT8_compressed_weights"
print(f"Checking model directory: {model_dir}")
print(f"Contents: {os.listdir(model_dir)}") # Check contents of the directory
print(f"Loading model from {model_dir}")
model_name = "susnato/phi-2"
model_configuration = SUPPORTED_LLM_MODELS["phi-2"]
ov_config = {"PERFORMANCE_HINT": "LATENCY", "NUM_STREAMS": "1", "CACHE_DIR": ""}
tok = AutoTokenizer.from_pretrained(model_name)
ov_model = OVModelForCausalLM.from_pretrained(
model_dir,
device="CPU",
ov_config=ov_config,
)
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer_kwargs = model_configuration.get("toeknizer_kwargs", {})
# Continue with your tokenizer usage
response_key = model_configuration.get("response_key")
tokenizer_response_key = None
def get_special_token_id(tokenizer: AutoTokenizer, key: str) -> int:
"""
Gets the token ID for a given string that has been added to the tokenizer as a special token.
Args:
tokenizer (PreTrainedTokenizer): the tokenizer
key (str): the key to convert to a single token
Raises:
ValueError: if more than one ID was generated
Returns:
int: the token ID for the given key
"""
token_ids = tokenizer.encode(key)
if len(token_ids) > 1:
raise ValueError(f"Expected only a single token for '{key}' but found {token_ids}")
return token_ids[0]
if response_key is not None:
tokenizer_response_key = next(
(token for token in tokenizer.additional_special_tokens if token.startswith(response_key)),
None,
)
end_key_token_id = None
if tokenizer_response_key:
try:
end_key = model_configuration.get("end_key")
if end_key:
end_key_token_id =get_special_token_id(tokenizer, end_key)
# Ensure generation stops once it generates "### End"
except ValueError:
pass
prompt_template = model_configuration.get("prompt_template", "{instruction}")
end_key_token_id = end_key_token_id or tokenizer.eos_token_id
pad_token_id = end_key_token_id or tokenizer.pad_token_id
def estimate_latency(
current_time: float,
current_perf_text: str,
new_gen_text: str,
per_token_time: List[float],
num_tokens: int,
):
"""
Helper function for performance estimation
Parameters:
current_time (float): This step time in seconds.
current_perf_text (str): Current content of performance UI field.
new_gen_text (str): New generated text.
per_token_time (List[float]): history of performance from previous steps.
num_tokens (int): Total number of generated tokens.
Returns:
update for performance text field
update for a total number of tokens
"""
num_current_toks = len(tokenizer.encode(new_gen_text))
num_tokens += num_current_toks
per_token_time.append(num_current_toks / current_time)
if len(per_token_time) > 10 and len(per_token_time) % 4 == 0:
current_bucket = per_token_time[:-10]
return (
f"Average generation speed: {np.mean(current_bucket):.2f} tokens/s. Total generated tokens: {num_tokens}",
num_tokens,
)
return current_perf_text, num_tokens
def run_generation(
user_text: str,
top_p: float,
temperature: float,
top_k: int,
max_new_tokens: int,
perf_text: str,
):
"""
Text generation function
Parameters:
user_text (str): User-provided instruction for a generation.
top_p (float): Nucleus sampling. If set to < 1, only the smallest set of most probable tokens with probabilities that add up to top_p or higher are kept for a generation.
temperature (float): The value used to module the logits distribution.
top_k (int): The number of highest probability vocabulary tokens to keep for top-k-filtering.
max_new_tokens (int): Maximum length of generated sequence.
perf_text (str): Content of text field for printing performance results.
Returns:
model_output (str) - model-generated text
perf_text (str) - updated perf text filed content
"""
# Prepare input prompt according to model expected template
prompt_text = prompt_template.format(instruction=user_text)
# Tokenize the user text.
model_inputs = tokenizer(prompt_text, return_tensors="pt", **tokenizer_kwargs)
# Start generation on a separate thread, so that we don't block the UI. The text is pulled from the streamer
# in the main thread. Adds timeout to the streamer to handle exceptions in the generation thread.
streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
generate_kwargs = dict(
model_inputs,
streamer=streamer,
max_new_tokens=max_new_tokens,
do_sample=True,
top_p=top_p,
temperature=float(temperature),
top_k=top_k,
eos_token_id=end_key_token_id,
pad_token_id=pad_token_id,
)
t = Thread(target=ov_model.generate, kwargs=generate_kwargs)
t.start()
# Pull the generated text from the streamer, and update the model output.
model_output = ""
per_token_time = []
num_tokens = 0
start = perf_counter()
for new_text in streamer:
current_time = perf_counter() - start
model_output += new_text
perf_text, num_tokens = estimate_latency(current_time, perf_text, new_text, per_token_time, num_tokens)
yield model_output, perf_text
start = perf_counter()
return model_output, perf_text
def reset_textbox(instruction: str, response: str, perf: str):
"""
Helper function for resetting content of all text fields
Parameters:
instruction (str): Content of user instruction field.
response (str): Content of model response field.
perf (str): Content of performance info filed
Returns:
empty string for each placeholder
"""
return "", "", ""
examples = [
"Give me a recipe for pizza with pineapple",
"Write me a tweet about the new OpenVINO release",
"Explain the difference between CPU and GPU",
"Give five ideas for a great weekend with family",
"Do Androids dream of Electric sheep?",
"Who is Dolly?",
"Please give me advice on how to write resume?",
"Name 3 advantages to being a cat",
"Write instructions on how to become a good AI engineer",
"Write a love letter to my best friend",
]
def main():
with gr.Blocks() as demo:
gr.Markdown(
"# Question Answering with Model and OpenVINO.\n"
"Provide instruction which describes a task below or select among predefined examples and model writes response that performs requested task."
)
with gr.Row():
with gr.Column(scale=4):
user_text = gr.Textbox(
placeholder="Write an email about an alpaca that likes flan",
label="User instruction",
)
model_output = gr.Textbox(label="Model response", interactive=False)
performance = gr.Textbox(label="Performance", lines=1, interactive=False)
with gr.Column(scale=1):
button_clear = gr.Button(value="Clear")
button_submit = gr.Button(value="Submit")
gr.Examples(examples, user_text)
with gr.Column(scale=1):
max_new_tokens = gr.Slider(
minimum=1,
maximum=1000,
value=256,
step=1,
interactive=True,
label="Max New Tokens",
)
top_p = gr.Slider(
minimum=0.05,
maximum=1.0,
value=0.92,
step=0.05,
interactive=True,
label="Top-p (nucleus sampling)",
)
top_k = gr.Slider(
minimum=0,
maximum=50,
value=0,
step=1,
interactive=True,
label="Top-k",
)
temperature = gr.Slider(
minimum=0.1,
maximum=5.0,
value=0.8,
step=0.1,
interactive=True,
label="Temperature",
)
user_text.submit(
run_generation,
[user_text, top_p, temperature, top_k, max_new_tokens, performance],
[model_output, performance],
)
button_submit.click(
run_generation,
[user_text, top_p, temperature, top_k, max_new_tokens, performance],
[model_output, performance],
)
button_clear.click(
reset_textbox,
[user_text, model_output, performance],
[user_text, model_output, performance],
)
if __name__ == "__main__":
demo.queue()
try:
demo.launch(height=800)
except Exception:
demo.launch(share=True, height=800)
# Call main function to start Gradio interface
main()