Spaces:
Runtime error
Runtime error
| import os | |
| from transformers import AutoTokenizer, AutoConfig | |
| from optimum.intel.openvino import OVModelForCausalLM | |
| from generation_utils import run_generation, estimate_latency, reset_textbox,get_special_token_id | |
| from config import SUPPORTED_LLM_MODELS | |
| import gradio as gr | |
| from threading import Thread | |
| from time import perf_counter | |
| from typing import List | |
| from transformers import AutoTokenizer, TextIteratorStreamer | |
| import numpy as np | |
| import os | |
| from flask import Flask, render_template, redirect, url_for, request, flash | |
| from flask_sqlalchemy import SQLAlchemy | |
| from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user | |
| from werkzeug.security import generate_password_hash, check_password_hash | |
| app = Flask(__name__) | |
| app.config['SECRET_KEY'] = 'your_secret_key' | |
| app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db' | |
| db = SQLAlchemy(app) | |
| login_manager = LoginManager() | |
| login_manager.init_app(app) | |
| login_manager.login_view = 'login' | |
| class User(db.Model): | |
| id = db.Column(db.Integer, primary_key=True) | |
| username = db.Column(db.String(80), unique=True, nullable=False) | |
| email = db.Column(db.String(120), unique=True, nullable=False) | |
| def __repr__(self): | |
| return '<User %r>' % self.username | |
| # Create the database tables | |
| with app.app_context(): | |
| db.create_all() | |
| def load_user(user_id): | |
| return User.query.get(int(user_id)) | |
| def signup(): | |
| if request.method == 'POST': | |
| username = request.form['username'] | |
| password = request.form['password'] | |
| hashed_password = generate_password_hash(password, method='sha256') | |
| new_user = User(username=username, password=hashed_password) | |
| db.session.add(new_user) | |
| db.session.commit() | |
| flash('Signup successful!', 'success') | |
| return redirect(url_for('login')) | |
| return render_template('signup.html') | |
| def login(): | |
| if request.method == 'POST': | |
| username = request.form['username'] | |
| password = request.form['password'] | |
| user = User.query.filter_by(username=username).first() | |
| if user and check_password_hash(user.password, password): | |
| login_user(user) | |
| return redirect(url_for('dashboard')) | |
| flash('Invalid username or password', 'danger') | |
| return render_template('login.html') | |
| def dashboard(): | |
| return render_template('dashboard.html', name=current_user.username) | |
| def logout(): | |
| logout_user() | |
| return redirect(url_for('login')) | |
| if __name__ == '__main__': | |
| app.run(debug=True) | |
| model_dir = "C:/Users/KIIT/OneDrive/Desktop/INTEL/phi-2/INT8_compressed_weights" | |
| print(f"Checking model directory: {model_dir}") | |
| print(f"Contents: {os.listdir(model_dir)}") # Check contents of the directory | |
| print(f"Loading model from {model_dir}") | |
| model_name = "susnato/phi-2" | |
| model_configuration = SUPPORTED_LLM_MODELS["phi-2"] | |
| ov_config = {"PERFORMANCE_HINT": "LATENCY", "NUM_STREAMS": "1", "CACHE_DIR": ""} | |
| tok = AutoTokenizer.from_pretrained(model_name) | |
| ov_model = OVModelForCausalLM.from_pretrained( | |
| model_dir, | |
| device="CPU", | |
| ov_config=ov_config, | |
| ) | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| tokenizer_kwargs = model_configuration.get("toeknizer_kwargs", {}) | |
| # Continue with your tokenizer usage | |
| response_key = model_configuration.get("response_key") | |
| tokenizer_response_key = None | |
| def get_special_token_id(tokenizer: AutoTokenizer, key: str) -> int: | |
| """ | |
| Gets the token ID for a given string that has been added to the tokenizer as a special token. | |
| Args: | |
| tokenizer (PreTrainedTokenizer): the tokenizer | |
| key (str): the key to convert to a single token | |
| Raises: | |
| ValueError: if more than one ID was generated | |
| Returns: | |
| int: the token ID for the given key | |
| """ | |
| token_ids = tokenizer.encode(key) | |
| if len(token_ids) > 1: | |
| raise ValueError(f"Expected only a single token for '{key}' but found {token_ids}") | |
| return token_ids[0] | |
| if response_key is not None: | |
| tokenizer_response_key = next( | |
| (token for token in tokenizer.additional_special_tokens if token.startswith(response_key)), | |
| None, | |
| ) | |
| end_key_token_id = None | |
| if tokenizer_response_key: | |
| try: | |
| end_key = model_configuration.get("end_key") | |
| if end_key: | |
| end_key_token_id =get_special_token_id(tokenizer, end_key) | |
| # Ensure generation stops once it generates "### End" | |
| except ValueError: | |
| pass | |
| prompt_template = model_configuration.get("prompt_template", "{instruction}") | |
| end_key_token_id = end_key_token_id or tokenizer.eos_token_id | |
| pad_token_id = end_key_token_id or tokenizer.pad_token_id | |
| def estimate_latency( | |
| current_time: float, | |
| current_perf_text: str, | |
| new_gen_text: str, | |
| per_token_time: List[float], | |
| num_tokens: int, | |
| ): | |
| """ | |
| Helper function for performance estimation | |
| Parameters: | |
| current_time (float): This step time in seconds. | |
| current_perf_text (str): Current content of performance UI field. | |
| new_gen_text (str): New generated text. | |
| per_token_time (List[float]): history of performance from previous steps. | |
| num_tokens (int): Total number of generated tokens. | |
| Returns: | |
| update for performance text field | |
| update for a total number of tokens | |
| """ | |
| num_current_toks = len(tokenizer.encode(new_gen_text)) | |
| num_tokens += num_current_toks | |
| per_token_time.append(num_current_toks / current_time) | |
| if len(per_token_time) > 10 and len(per_token_time) % 4 == 0: | |
| current_bucket = per_token_time[:-10] | |
| return ( | |
| f"Average generation speed: {np.mean(current_bucket):.2f} tokens/s. Total generated tokens: {num_tokens}", | |
| num_tokens, | |
| ) | |
| return current_perf_text, num_tokens | |
| def run_generation( | |
| user_text: str, | |
| top_p: float, | |
| temperature: float, | |
| top_k: int, | |
| max_new_tokens: int, | |
| perf_text: str, | |
| ): | |
| """ | |
| Text generation function | |
| Parameters: | |
| user_text (str): User-provided instruction for a generation. | |
| top_p (float): Nucleus sampling. If set to < 1, only the smallest set of most probable tokens with probabilities that add up to top_p or higher are kept for a generation. | |
| temperature (float): The value used to module the logits distribution. | |
| top_k (int): The number of highest probability vocabulary tokens to keep for top-k-filtering. | |
| max_new_tokens (int): Maximum length of generated sequence. | |
| perf_text (str): Content of text field for printing performance results. | |
| Returns: | |
| model_output (str) - model-generated text | |
| perf_text (str) - updated perf text filed content | |
| """ | |
| # Prepare input prompt according to model expected template | |
| prompt_text = prompt_template.format(instruction=user_text) | |
| # Tokenize the user text. | |
| model_inputs = tokenizer(prompt_text, return_tensors="pt", **tokenizer_kwargs) | |
| # Start generation on a separate thread, so that we don't block the UI. The text is pulled from the streamer | |
| # in the main thread. Adds timeout to the streamer to handle exceptions in the generation thread. | |
| streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) | |
| generate_kwargs = dict( | |
| model_inputs, | |
| streamer=streamer, | |
| max_new_tokens=max_new_tokens, | |
| do_sample=True, | |
| top_p=top_p, | |
| temperature=float(temperature), | |
| top_k=top_k, | |
| eos_token_id=end_key_token_id, | |
| pad_token_id=pad_token_id, | |
| ) | |
| t = Thread(target=ov_model.generate, kwargs=generate_kwargs) | |
| t.start() | |
| # Pull the generated text from the streamer, and update the model output. | |
| model_output = "" | |
| per_token_time = [] | |
| num_tokens = 0 | |
| start = perf_counter() | |
| for new_text in streamer: | |
| current_time = perf_counter() - start | |
| model_output += new_text | |
| perf_text, num_tokens = estimate_latency(current_time, perf_text, new_text, per_token_time, num_tokens) | |
| yield model_output, perf_text | |
| start = perf_counter() | |
| return model_output, perf_text | |
| def reset_textbox(instruction: str, response: str, perf: str): | |
| """ | |
| Helper function for resetting content of all text fields | |
| Parameters: | |
| instruction (str): Content of user instruction field. | |
| response (str): Content of model response field. | |
| perf (str): Content of performance info filed | |
| Returns: | |
| empty string for each placeholder | |
| """ | |
| return "", "", "" | |
| examples = [ | |
| "Give me a recipe for pizza with pineapple", | |
| "Write me a tweet about the new OpenVINO release", | |
| "Explain the difference between CPU and GPU", | |
| "Give five ideas for a great weekend with family", | |
| "Do Androids dream of Electric sheep?", | |
| "Who is Dolly?", | |
| "Please give me advice on how to write resume?", | |
| "Name 3 advantages to being a cat", | |
| "Write instructions on how to become a good AI engineer", | |
| "Write a love letter to my best friend", | |
| ] | |
| def main(): | |
| with gr.Blocks() as demo: | |
| gr.Markdown( | |
| "# Question Answering with Model and OpenVINO.\n" | |
| "Provide instruction which describes a task below or select among predefined examples and model writes response that performs requested task." | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| user_text = gr.Textbox( | |
| placeholder="Write an email about an alpaca that likes flan", | |
| label="User instruction", | |
| ) | |
| model_output = gr.Textbox(label="Model response", interactive=False) | |
| performance = gr.Textbox(label="Performance", lines=1, interactive=False) | |
| with gr.Column(scale=1): | |
| button_clear = gr.Button(value="Clear") | |
| button_submit = gr.Button(value="Submit") | |
| gr.Examples(examples, user_text) | |
| with gr.Column(scale=1): | |
| max_new_tokens = gr.Slider( | |
| minimum=1, | |
| maximum=1000, | |
| value=256, | |
| step=1, | |
| interactive=True, | |
| label="Max New Tokens", | |
| ) | |
| top_p = gr.Slider( | |
| minimum=0.05, | |
| maximum=1.0, | |
| value=0.92, | |
| step=0.05, | |
| interactive=True, | |
| label="Top-p (nucleus sampling)", | |
| ) | |
| top_k = gr.Slider( | |
| minimum=0, | |
| maximum=50, | |
| value=0, | |
| step=1, | |
| interactive=True, | |
| label="Top-k", | |
| ) | |
| temperature = gr.Slider( | |
| minimum=0.1, | |
| maximum=5.0, | |
| value=0.8, | |
| step=0.1, | |
| interactive=True, | |
| label="Temperature", | |
| ) | |
| user_text.submit( | |
| run_generation, | |
| [user_text, top_p, temperature, top_k, max_new_tokens, performance], | |
| [model_output, performance], | |
| ) | |
| button_submit.click( | |
| run_generation, | |
| [user_text, top_p, temperature, top_k, max_new_tokens, performance], | |
| [model_output, performance], | |
| ) | |
| button_clear.click( | |
| reset_textbox, | |
| [user_text, model_output, performance], | |
| [user_text, model_output, performance], | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue() | |
| try: | |
| demo.launch(height=800) | |
| except Exception: | |
| demo.launch(share=True, height=800) | |
| # Call main function to start Gradio interface | |
| main() |