import gradio as gr import json from datetime import datetime import random import string import re from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload from huggingface_hub import snapshot_download class ChatbotApp: def __init__(self, repo_name, model_file, service_account_file, folder_id): self.repo_name = repo_name self.model_file = model_file self.service_account_file = service_account_file self.folder_id = folder_id self.chat_history = [] self.chat_log_history = [] self.isFirstRun = True self.initContext = "Your initial context here" self.context = "" self.Name = "" self.Occupation = "" self.Ethnicity = "" self.Gender = "" self.Age = "" self.YearsOfExp = "" self.agreed = False self.unique_id = self.generate_unique_id() self.service = self.get_drive_service() self.app = self.create_app() self.snapshot_download_model() def snapshot_download_model(self): print(f'Fetching model: {self.repo_name}, {self.model_file}') snapshot_download(repo_id=self.repo_name, local_dir=".", allow_patterns=[self.model_file]) print('Done fetching model.') def generate_unique_id(self): letters = ''.join(random.choices(string.ascii_letters, k=3)) digits = ''.join(random.choices(string.digits, k=3)) return letters + digits def get_drive_service(self): credentials = service_account.Credentials.from_service_account_file( self.service_account_file, scopes=['https://www.googleapis.com/auth/drive']) service = build('drive', 'v3', credentials=credentials) print("Google Drive service created.") return service def search_file(self): query = f"name = '{self.chat_log_name}' and '{self.folder_id}' in parents and trashed = false" response = self.service.files().list(q=query, spaces='drive', fields='files(id, name)').execute() return response.get('files', []) def strip_text(self, text): pattern = r"\(.*?\)|<.*?>.*" cleaned_text = re.sub(pattern, "", text) return cleaned_text def upload_to_google_drive(self): self.chat_log_name = f'chat_log_for_{self.unique_id}_{datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}.json' existing_files = self.search_file() data = { "Unique ID": self.unique_id, "chat_history": self.chat_log_history } with open(self.chat_log_name, "w") as log_file: json.dump(data, log_file, indent=4) if not existing_files: file_metadata = { 'name': self.chat_log_name, 'parents': [self.folder_id], 'mimeType': 'application/json' } media = MediaFileUpload(self.chat_log_name, mimetype='application/json') file = self.service.files().create(body=file_metadata, media_body=media, fields='id').execute() print(f"Uploaded new file with ID: {file.get('id')}") else: file_id = existing_files[0]['id'] media = MediaFileUpload(self.chat_log_name, mimetype='application/json') updated_file = self.service.files().update(fileId=file_id, media_body=media).execute() print(f"Updated existing file with ID: {updated_file.get('id')}") def generate(self, prompt, history): if not self.agreed: output = "Please agree to the terms and conditions to start chatting." self.chat_history.append(("System", output)) else: if self.isFirstRun: self.context = self.initContext self.isFirstRun = False cleaned_prompt = self.strip_text(prompt) cleaned_response = f"Simulated response for: {cleaned_prompt}" # Placeholder for actual model response self.chat_history.append((cleaned_prompt, cleaned_response)) self.chat_log_history.append({"user": cleaned_prompt, "bot": cleaned_response}) self.upload_to_google_drive() return self.chat_history def reset_all(self): self.chat_history = [] self.chat_log_history = [] self.isFirstRun = True self.unique_id = self.generate_unique_id() print("All components have been reset.") return "Reset completed." def create_app(self): with gr.Blocks() as app: gr.Markdown("## Chatbot Interface") agree_status = gr.Checkbox(label="I agree to the terms and conditions.") chatbot_interface = gr.Chatbot() msg_input = gr.Textbox(label="Type your message here") send_button = gr.Button("Send") reset_button = gr.Button("Reset") def update_agreed_status(status): self.agreed = status return "You can now start chatting." agree_status.change(update_agreed_status, inputs=[agree_status], outputs=[]) send_button.click(self.generate, inputs=[msg_input, chatbot_interface], outputs=chatbot_interface) reset_button.click(self.reset_all, inputs=[], outputs=[]) return app def launch(self): self.app.launch(debug=True) # Example usage repo_name = 'YourRepoName/YourModelName' model_file = "YourModelFileName" service_account_file = '/path/to/your/service_account_creds.json' folder_id = 'YourGoogleDriveFolderID' chatbot_app = ChatbotApp(repo_name, model_file, service_account_file, folder_id) chatbot_app.launch()