|
import gradio as gr |
|
import json |
|
from datetime import datetime |
|
import random |
|
import string |
|
import re |
|
from google.oauth2 import service_account |
|
from googleapiclient.discovery import build |
|
from googleapiclient.http import MediaFileUpload |
|
from huggingface_hub import snapshot_download |
|
|
|
class ChatbotApp: |
|
def __init__(self, repo_name, model_file, service_account_file, folder_id): |
|
self.repo_name = repo_name |
|
self.model_file = model_file |
|
self.service_account_file = service_account_file |
|
self.folder_id = folder_id |
|
self.chat_history = [] |
|
self.chat_log_history = [] |
|
self.isFirstRun = True |
|
self.initContext = "Your initial context here" |
|
self.context = "" |
|
self.Name = "" |
|
self.Occupation = "" |
|
self.Ethnicity = "" |
|
self.Gender = "" |
|
self.Age = "" |
|
self.YearsOfExp = "" |
|
self.agreed = False |
|
self.unique_id = self.generate_unique_id() |
|
self.service = self.get_drive_service() |
|
self.app = self.create_app() |
|
self.snapshot_download_model() |
|
|
|
def snapshot_download_model(self): |
|
print(f'Fetching model: {self.repo_name}, {self.model_file}') |
|
snapshot_download(repo_id=self.repo_name, local_dir=".", allow_patterns=[self.model_file]) |
|
print('Done fetching model.') |
|
|
|
def generate_unique_id(self): |
|
letters = ''.join(random.choices(string.ascii_letters, k=3)) |
|
digits = ''.join(random.choices(string.digits, k=3)) |
|
return letters + digits |
|
|
|
def get_drive_service(self): |
|
credentials = service_account.Credentials.from_service_account_file( |
|
self.service_account_file, scopes=['https://www.googleapis.com/auth/drive']) |
|
service = build('drive', 'v3', credentials=credentials) |
|
print("Google Drive service created.") |
|
return service |
|
|
|
def search_file(self): |
|
query = f"name = '{self.chat_log_name}' and '{self.folder_id}' in parents and trashed = false" |
|
response = self.service.files().list(q=query, spaces='drive', fields='files(id, name)').execute() |
|
return response.get('files', []) |
|
|
|
def strip_text(self, text): |
|
pattern = r"\(.*?\)|<.*?>.*" |
|
cleaned_text = re.sub(pattern, "", text) |
|
return cleaned_text |
|
|
|
def upload_to_google_drive(self): |
|
self.chat_log_name = f'chat_log_for_{self.unique_id}_{datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}.json' |
|
existing_files = self.search_file() |
|
data = { |
|
"Unique ID": self.unique_id, |
|
"chat_history": self.chat_log_history |
|
} |
|
|
|
with open(self.chat_log_name, "w") as log_file: |
|
json.dump(data, log_file, indent=4) |
|
|
|
if not existing_files: |
|
file_metadata = { |
|
'name': self.chat_log_name, |
|
'parents': [self.folder_id], |
|
'mimeType': 'application/json' |
|
} |
|
media = MediaFileUpload(self.chat_log_name, mimetype='application/json') |
|
file = self.service.files().create(body=file_metadata, media_body=media, fields='id').execute() |
|
print(f"Uploaded new file with ID: {file.get('id')}") |
|
else: |
|
file_id = existing_files[0]['id'] |
|
media = MediaFileUpload(self.chat_log_name, mimetype='application/json') |
|
updated_file = self.service.files().update(fileId=file_id, media_body=media).execute() |
|
print(f"Updated existing file with ID: {updated_file.get('id')}") |
|
|
|
def generate(self, prompt, history): |
|
if not self.agreed: |
|
output = "Please agree to the terms and conditions to start chatting." |
|
self.chat_history.append(("System", output)) |
|
else: |
|
if self.isFirstRun: |
|
self.context = self.initContext |
|
self.isFirstRun = False |
|
|
|
cleaned_prompt = self.strip_text(prompt) |
|
cleaned_response = f"Simulated response for: {cleaned_prompt}" |
|
|
|
self.chat_history.append((cleaned_prompt, cleaned_response)) |
|
self.chat_log_history.append({"user": cleaned_prompt, "bot": cleaned_response}) |
|
self.upload_to_google_drive() |
|
|
|
return self.chat_history |
|
|
|
def reset_all(self): |
|
self.chat_history = [] |
|
self.chat_log_history = [] |
|
self.isFirstRun = True |
|
self.unique_id = self.generate_unique_id() |
|
print("All components have been reset.") |
|
return "Reset completed." |
|
|
|
def create_app(self): |
|
with gr.Blocks() as app: |
|
gr.Markdown("## Chatbot Interface") |
|
agree_status = gr.Checkbox(label="I agree to the terms and conditions.") |
|
chatbot_interface = gr.Chatbot() |
|
msg_input = gr.Textbox(label="Type your message here") |
|
send_button = gr.Button("Send") |
|
reset_button = gr.Button("Reset") |
|
|
|
def update_agreed_status(status): |
|
self.agreed = status |
|
return "You can now start chatting." |
|
|
|
agree_status.change(update_agreed_status, inputs=[agree_status], outputs=[]) |
|
|
|
send_button.click(self.generate, inputs=[msg_input, chatbot_interface], outputs=chatbot_interface) |
|
reset_button.click(self.reset_all, inputs=[], outputs=[]) |
|
|
|
return app |
|
|
|
def launch(self): |
|
self.app.launch(debug=True) |
|
|
|
|
|
repo_name = 'YourRepoName/YourModelName' |
|
model_file = "YourModelFileName" |
|
service_account_file = '/path/to/your/service_account_creds.json' |
|
folder_id = 'YourGoogleDriveFolderID' |
|
|
|
chatbot_app = ChatbotApp(repo_name, model_file, service_account_file, folder_id) |
|
chatbot_app.launch() |