import os from transformers import AutoTokenizer, AutoConfig from optimum.intel.openvino import OVModelForCausalLM from generation_utils import run_generation, estimate_latency, reset_textbox,get_special_token_id from config import SUPPORTED_LLM_MODELS import gradio as gr from threading import Thread from time import perf_counter from typing import List from transformers import AutoTokenizer, TextIteratorStreamer import numpy as np import os from flask import Flask, render_template, redirect, url_for, request, flash from flask_sqlalchemy import SQLAlchemy from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user from werkzeug.security import generate_password_hash, check_password_hash app = Flask(__name__) app.config['SECRET_KEY'] = 'your_secret_key' app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db' db = SQLAlchemy(app) login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'login' class User(db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) email = db.Column(db.String(120), unique=True, nullable=False) def __repr__(self): return '' % self.username # Create the database tables with app.app_context(): db.create_all() @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) @app.route('/signup', methods=['GET', 'POST']) def signup(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] hashed_password = generate_password_hash(password, method='sha256') new_user = User(username=username, password=hashed_password) db.session.add(new_user) db.session.commit() flash('Signup successful!', 'success') return redirect(url_for('login')) return render_template('signup.html') @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] user = User.query.filter_by(username=username).first() if user and check_password_hash(user.password, password): login_user(user) return redirect(url_for('dashboard')) flash('Invalid username or password', 'danger') return render_template('login.html') @app.route('/dashboard') @login_required def dashboard(): return render_template('dashboard.html', name=current_user.username) @app.route('/logout') @login_required def logout(): logout_user() return redirect(url_for('login')) if __name__ == '__main__': app.run(debug=True) model_dir = "C:/Users/KIIT/OneDrive/Desktop/INTEL/phi-2/INT8_compressed_weights" print(f"Checking model directory: {model_dir}") print(f"Contents: {os.listdir(model_dir)}") # Check contents of the directory print(f"Loading model from {model_dir}") model_name = "susnato/phi-2" model_configuration = SUPPORTED_LLM_MODELS["phi-2"] ov_config = {"PERFORMANCE_HINT": "LATENCY", "NUM_STREAMS": "1", "CACHE_DIR": ""} tok = AutoTokenizer.from_pretrained(model_name) ov_model = OVModelForCausalLM.from_pretrained( model_dir, device="CPU", ov_config=ov_config, ) tokenizer = AutoTokenizer.from_pretrained(model_name) tokenizer_kwargs = model_configuration.get("toeknizer_kwargs", {}) # Continue with your tokenizer usage response_key = model_configuration.get("response_key") tokenizer_response_key = None def get_special_token_id(tokenizer: AutoTokenizer, key: str) -> int: """ Gets the token ID for a given string that has been added to the tokenizer as a special token. Args: tokenizer (PreTrainedTokenizer): the tokenizer key (str): the key to convert to a single token Raises: ValueError: if more than one ID was generated Returns: int: the token ID for the given key """ token_ids = tokenizer.encode(key) if len(token_ids) > 1: raise ValueError(f"Expected only a single token for '{key}' but found {token_ids}") return token_ids[0] if response_key is not None: tokenizer_response_key = next( (token for token in tokenizer.additional_special_tokens if token.startswith(response_key)), None, ) end_key_token_id = None if tokenizer_response_key: try: end_key = model_configuration.get("end_key") if end_key: end_key_token_id =get_special_token_id(tokenizer, end_key) # Ensure generation stops once it generates "### End" except ValueError: pass prompt_template = model_configuration.get("prompt_template", "{instruction}") end_key_token_id = end_key_token_id or tokenizer.eos_token_id pad_token_id = end_key_token_id or tokenizer.pad_token_id def estimate_latency( current_time: float, current_perf_text: str, new_gen_text: str, per_token_time: List[float], num_tokens: int, ): """ Helper function for performance estimation Parameters: current_time (float): This step time in seconds. current_perf_text (str): Current content of performance UI field. new_gen_text (str): New generated text. per_token_time (List[float]): history of performance from previous steps. num_tokens (int): Total number of generated tokens. Returns: update for performance text field update for a total number of tokens """ num_current_toks = len(tokenizer.encode(new_gen_text)) num_tokens += num_current_toks per_token_time.append(num_current_toks / current_time) if len(per_token_time) > 10 and len(per_token_time) % 4 == 0: current_bucket = per_token_time[:-10] return ( f"Average generation speed: {np.mean(current_bucket):.2f} tokens/s. Total generated tokens: {num_tokens}", num_tokens, ) return current_perf_text, num_tokens def run_generation( user_text: str, top_p: float, temperature: float, top_k: int, max_new_tokens: int, perf_text: str, ): """ Text generation function Parameters: user_text (str): User-provided instruction for a generation. top_p (float): Nucleus sampling. If set to < 1, only the smallest set of most probable tokens with probabilities that add up to top_p or higher are kept for a generation. temperature (float): The value used to module the logits distribution. top_k (int): The number of highest probability vocabulary tokens to keep for top-k-filtering. max_new_tokens (int): Maximum length of generated sequence. perf_text (str): Content of text field for printing performance results. Returns: model_output (str) - model-generated text perf_text (str) - updated perf text filed content """ # Prepare input prompt according to model expected template prompt_text = prompt_template.format(instruction=user_text) # Tokenize the user text. model_inputs = tokenizer(prompt_text, return_tensors="pt", **tokenizer_kwargs) # Start generation on a separate thread, so that we don't block the UI. The text is pulled from the streamer # in the main thread. Adds timeout to the streamer to handle exceptions in the generation thread. streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) generate_kwargs = dict( model_inputs, streamer=streamer, max_new_tokens=max_new_tokens, do_sample=True, top_p=top_p, temperature=float(temperature), top_k=top_k, eos_token_id=end_key_token_id, pad_token_id=pad_token_id, ) t = Thread(target=ov_model.generate, kwargs=generate_kwargs) t.start() # Pull the generated text from the streamer, and update the model output. model_output = "" per_token_time = [] num_tokens = 0 start = perf_counter() for new_text in streamer: current_time = perf_counter() - start model_output += new_text perf_text, num_tokens = estimate_latency(current_time, perf_text, new_text, per_token_time, num_tokens) yield model_output, perf_text start = perf_counter() return model_output, perf_text def reset_textbox(instruction: str, response: str, perf: str): """ Helper function for resetting content of all text fields Parameters: instruction (str): Content of user instruction field. response (str): Content of model response field. perf (str): Content of performance info filed Returns: empty string for each placeholder """ return "", "", "" examples = [ "Give me a recipe for pizza with pineapple", "Write me a tweet about the new OpenVINO release", "Explain the difference between CPU and GPU", "Give five ideas for a great weekend with family", "Do Androids dream of Electric sheep?", "Who is Dolly?", "Please give me advice on how to write resume?", "Name 3 advantages to being a cat", "Write instructions on how to become a good AI engineer", "Write a love letter to my best friend", ] def main(): with gr.Blocks() as demo: gr.Markdown( "# Question Answering with Model and OpenVINO.\n" "Provide instruction which describes a task below or select among predefined examples and model writes response that performs requested task." ) with gr.Row(): with gr.Column(scale=4): user_text = gr.Textbox( placeholder="Write an email about an alpaca that likes flan", label="User instruction", ) model_output = gr.Textbox(label="Model response", interactive=False) performance = gr.Textbox(label="Performance", lines=1, interactive=False) with gr.Column(scale=1): button_clear = gr.Button(value="Clear") button_submit = gr.Button(value="Submit") gr.Examples(examples, user_text) with gr.Column(scale=1): max_new_tokens = gr.Slider( minimum=1, maximum=1000, value=256, step=1, interactive=True, label="Max New Tokens", ) top_p = gr.Slider( minimum=0.05, maximum=1.0, value=0.92, step=0.05, interactive=True, label="Top-p (nucleus sampling)", ) top_k = gr.Slider( minimum=0, maximum=50, value=0, step=1, interactive=True, label="Top-k", ) temperature = gr.Slider( minimum=0.1, maximum=5.0, value=0.8, step=0.1, interactive=True, label="Temperature", ) user_text.submit( run_generation, [user_text, top_p, temperature, top_k, max_new_tokens, performance], [model_output, performance], ) button_submit.click( run_generation, [user_text, top_p, temperature, top_k, max_new_tokens, performance], [model_output, performance], ) button_clear.click( reset_textbox, [user_text, model_output, performance], [user_text, model_output, performance], ) if __name__ == "__main__": demo.queue() try: demo.launch(height=800) except Exception: demo.launch(share=True, height=800) # Call main function to start Gradio interface main()