|
MAIN_SCRIPT_MODULE |
|
(common_module) |
|
|
|
|
|
import atexit |
|
import nltk |
|
import pyttsx3 |
|
import spacy |
|
import speech_recognition as sr |
|
import torch |
|
from transformers import GPTNeoForCausalLM, AutoTokenizer |
|
from nltk.sentiment import SentimentIntensityAnalyzer |
|
import os |
|
import json |
|
from memory_module import MemoryModule |
|
from sentiment_module import SentimentAnalysisModule |
|
|
|
# Get the current directory |
|
current_directory = os.getcwd() |
|
|
|
# Get a list of files and directories in the current directory |
|
file_list = os.listdir(current_directory) |
|
|
|
# Print the list |
|
for file_name in file_list: |
|
print(file_name) |
|
|
|
sia = SentimentIntensityAnalyzer() |
|
sentence = "This is a positive sentence." |
|
sentiment = sia.polarity_scores(sentence) |
|
|
|
# Access sentiment scores |
|
compound_score = sentiment['compound'] |
|
positive_score = sentiment['pos'] |
|
negative_score = sentiment['neg'] |
|
|
|
model_directory = "EleutherAI/gpt-neo-125m" |
|
|
|
# Download necessary NLTK resources |
|
nltk.download('punkt') |
|
nltk.download('wordnet') |
|
nltk.download('stopwords') |
|
|
|
# Check if GPU is available and set the device accordingly |
|
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') |
|
if torch.cuda.is_available(): |
|
current_device = torch.cuda.current_device() |
|
print(f"Using GPU: {torch.cuda.get_device_name(current_device)}") |
|
else: |
|
print("No GPU available, using CPU.") |
|
|
|
# Initialize the speech engine |
|
speech_engine = pyttsx3.init() |
|
|
|
# Get the list of available voices |
|
voices = speech_engine.getProperty('voices') |
|
for voice in voices: |
|
print(voice.id, voice.name) |
|
|
|
# Set the desired voice |
|
desired_voice = "Microsoft Hazel Desktop - English (Great Britain)" |
|
voice_id = None |
|
|
|
# Find the voice ID based on the desired voice name |
|
for voice in voices: |
|
if desired_voice in voice.name: |
|
voice_id = voice.id |
|
break |
|
|
|
if voice_id: |
|
speech_engine.setProperty('voice', voice_id) |
|
print("Desired voice set successfully.") |
|
else: |
|
print("Desired voice not found.") |
|
|
|
# Load the spaCy English model |
|
nlp = spacy.load('en_core_web_sm') |
|
|
|
# Update the CommonModule instantiation |
|
load_memory_file = "load_memory.json" |
|
save_memory_file = "save_memory.json" |
|
|
|
class CommonModule: |
|
def __init__(self, model, name, param1, param2, load_memory_file, save_memory_file): |
|
# Initialize the instance variables using the provided arguments |
|
self.memory = [] # Initialize memory as a list |
|
self.name = name |
|
self.param1 = param1 |
|
self.param2 = param2 |
|
self.model = GPTNeoForCausalLM.from_pretrained(model_directory) |
|
|
|
self.tokenizer = AutoTokenizer.from_pretrained(model_directory) |
|
self.tokenizer.add_special_tokens({'pad_token': '[PAD]'}) |
|
self.gpt3_model = GPTNeoForCausalLM.from_pretrained(model_directory) |
|
self.gpt3_model.to(device) # Move model to the device (GPU or CPU) |
|
|
|
self.load_memory_file = "C:\\Users\\withe\\PycharmProjects\\no hope2\\Chat_Bot4\\load_memory.json" |
|
self.save_memory_file = "C:\\Users\\withe\\PycharmProjects\\no hope2\\Chat_Bot4\\save_memory.json" |
|
self.memory_module = MemoryModule(self.load_memory_file, self.save_memory_file) |
|
|
|
self.sentiment_module = SentimentAnalysisModule() |
|
self.speech_engine = speech_engine # Assign the initialized speech engine |
|
|
|
self.max_sequence_length = 200 # Decrease the value for faster response |
|
self.num_beams = 4 # Reduce the value for faster response |
|
self.no_repeat_ngram_size = 2 |
|
self.temperature = 0.3 |
|
self.response_cache = {} # Cache for storing frequently occurring responses |
|
|
|
# Initialize speech recognition |
|
self.recognizer = sr.Recognizer() |
|
|
|
|
|
|
|
def reset_conversation(self): |
|
self.memory_module.reset_memory() |
|
|
|
def retrieve_cached_response(self, input_text): |
|
named_entities = self.memory_module.get_named_entities() |
|
for entity in named_entities: |
|
if entity.lower() in input_text.lower(): |
|
return self.response_cache.get(entity) |
|
return None |
|
|
|
def generate_gpt2_response(self, input_text, conversation_history): |
|
# Prepare the conversation history for GPT-2 input format |
|
if len(conversation_history) == 0: |
|
gpt2_input = "USER: " + input_text + "\n" |
|
else: |
|
gpt2_input = "USER: " + conversation_history[-1] + "\n" # Append the user's query |
|
gpt2_input += "BOT: " + conversation_history[-2] + "\n" # Append the bot's previous response |
|
|
|
# Append the rest of the conversation history in reverse order |
|
for i in range(len(conversation_history) - 3, -1, -2): |
|
gpt2_input += "USER: " + conversation_history[i] + "\n" |
|
gpt2_input += "BOT: " + conversation_history[i - 1] + "\n" |
|
|
|
# Append the current user input to the conversation history |
|
gpt2_input += "USER: " + input_text + "\n" |
|
|
|
# Tokenize the input text |
|
input_ids = self.tokenizer.encode(gpt2_input, return_tensors='pt') |
|
|
|
# Generate response using the GPT-2 model |
|
with torch.no_grad(): |
|
output = self.model.generate(input_ids, max_length=100, num_return_sequences=1) |
|
|
|
# Decode the generated response |
|
generated_text = self.tokenizer.decode(output[:, input_ids.shape[-1]:][0], skip_special_tokens=True) |
|
|
|
# Process the GPT-2 response |
|
response = generated_text.strip().split("\n")[-1] # Extract the last line (bot's response) |
|
return response |
|
|
|
def process_input(self, input_text, conversation_history): |
|
named_entities = list(self.memory_module.get_named_entities()) |
|
for entity in named_entities: |
|
if entity in input_text: |
|
response = self.generate_gpt2_response(input_text, conversation_history) |
|
self.memory_module.add_to_memory(response) |
|
return response |
|
|
|
# Check if the input contains a question |
|
if '?' in input_text: |
|
return "You're making me angry, you wouldn't like me when I'm angry." |
|
|
|
# Check if the input is a greeting |
|
greetings = ['hello', 'hi', 'hey', 'hola'] |
|
for greeting in greetings: |
|
if greeting in input_text.lower(): |
|
return "Hello! How can I assist you today?" |
|
|
|
# Check if the input is a statement about the model |
|
if self.name.lower() in input_text.lower(): |
|
return "Yes, I am {}. How can I assist you today?".format(self.name) |
|
|
|
# Check if the input is a statement about the creator |
|
if 'creator' in input_text.lower(): |
|
return "I was created by {}.".format(self.param1) |
|
|
|
# Check if the input is a sentiment analysis request |
|
if 'sentiment' in input_text.lower(): |
|
sentiment = self.sentiment_module.analyze_sentiment(input_text) |
|
if sentiment == 'positive': |
|
return "The sentiment of the text is positive." |
|
elif sentiment == 'negative': |
|
return "The sentiment of the text is negative." |
|
else: |
|
return "The sentiment of the text is neutral." |
|
|
|
# Retrieve a cached response if available |
|
cached_response = self.retrieve_cached_response(input_text) |
|
if cached_response: |
|
return cached_response |
|
|
|
# Generate a response using GPT-2 |
|
response = self.generate_gpt2_response(input_text, conversation_history) |
|
|
|
# Update the conversation history and cache the response |
|
conversation_history.append(input_text) |
|
conversation_history.append(response) |
|
self.response_cache[input_text] = response |
|
|
|
# Update memory with the generated response |
|
self.memory_module.add_to_memory(response) |
|
|
|
return response |
|
|
|
|
|
common_module = CommonModule(model_directory, "Chatbot", "John Doe", "Jane Smith", load_memory_file, save_memory_file) |
|
|
|
|
|
def text_to_speech(text): |
|
common_module.speech_engine.say(text) |
|
common_module.speech_engine.runAndWait() |
|
|
|
|
|
def exit_handler(): |
|
common_module.reset_conversation() |
|
|
|
|
|
atexit.register(exit_handler) |
|
|
|
recognizer = sr.Recognizer() |
|
|
|
while True: |
|
with sr.Microphone() as source: |
|
print("Listening...") |
|
audio = recognizer.listen(source) |
|
|
|
try: |
|
user_input = recognizer.recognize_google(audio) |
|
print("User:", user_input) |
|
except sr.UnknownValueError: |
|
print("Sorry, I could not understand your speech.") |
|
continue |
|
except sr.RequestError: |
|
print("Sorry, the speech recognition service is currently unavailable.") |
|
continue |
|
|
|
response = common_module.process_input(user_input, []) |
|
print("Bot:", response) |
|
text_to_speech(response) |
|
|
|
|
|
|
|
MEMORY_MODULE |
|
|
|
import json |
|
import spacy |
|
|
|
# Load the spaCy English model |
|
nlp = spacy.load('en_core_web_sm') |
|
|
|
|
|
class MemoryModule: |
|
def __init__(self, load_file, save_file): |
|
self.memory = [] |
|
self.load_file = load_file |
|
self.save_file = save_file |
|
self.load_memory() |
|
|
|
def add_to_memory(self, statement): |
|
self.memory.append(statement) |
|
self.save_memory() |
|
|
|
def reset_memory(self): |
|
self.memory = [] |
|
self.save_memory() |
|
|
|
def save_memory(self): |
|
with open(self.save_file, 'w') as file: |
|
json.dump(self.memory, file) |
|
|
|
def load_memory(self): |
|
try: |
|
with open(self.load_file, 'r') as file: |
|
loaded_memory = json.load(file) |
|
if isinstance(loaded_memory, list): |
|
self.memory = loaded_memory |
|
else: |
|
print("Loaded memory is not a list. Starting with an empty memory.") |
|
except FileNotFoundError: |
|
print("Load memory file not found. Starting with an empty memory.") |
|
|
|
def get_named_entities(self): |
|
named_entities = set() |
|
for statement in self.memory: |
|
doc = nlp(statement) |
|
for entity in doc.ents: |
|
if entity.label_: |
|
named_entities.add(entity.text) |
|
return named_entities |
|
|
|
|
|
memory_module = MemoryModule( |
|
r"C:\Users\withe\PycharmProjects\no hope2\Chat_Bot4\load_memory.json", |
|
r"C:\Users\withe\PycharmProjects\no hope2\Chat_Bot4\save_memory.json" |
|
) |
|
|
|
|
|
|
|
|
|
SENTIMENT_MODULE |
|
|
|
|
|
|
|
class SentimentAnalysisModule: |
|
def __init__(self): |
|
self.sia = SentimentIntensityAnalyzer() |
|
|
|
def analyze_sentiment(self, text): |
|
sentiment = self.sia.polarity_scores(text) |
|
compound_score = sentiment['compound'] |
|
if compound_score >= 0.05: |
|
return 'positive' |
|
elif compound_score <= -0.05: |
|
return 'negative' |
|
else: |
|
return 'neutral' |
|
|
|
|
|
|
|
|