import atexit import json import os import sys from collections.abc import Generator
import nltk import numpy as np import pyttsx3 import spacy import spacy as nlp import speech_recognition as sr import transformers from nltk.corpus import stopwords from nltk.stem import WordNetLemmatizer from sklearn.feature_extraction.text import TfidfVectorizer from transformers import AutoTokenizer from transformers import GPTNeoForCausalLM from transformers import pipeline
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import torch print(torch.version)
model_path = spacy.util.get_package_path('en_core_web_sm') print(model_path)
print("transformers version:", transformers.version) print("spacy version:", spacy.version) print("nltk version:", nltk.version)
sys.path.append(r"C:\Users\withe\PycharmProjects\no hope2\Gpt-Neo1")
Download necessary NLTK resources
nltk.download('punkt') nltk.download('stopwords') nltk.download('wordnet') nltk.download('omw-1.4')
Load the spaCy model
nlp = spacy.load('en_core_web_sm')
Define a text input
text = "Example text to process"
Process the text using the nlp object
doc = nlp(text)
Extract named entities from the processed text
named_entities = [] for entity in doc.ents: if entity.label_ in ['PERSON', 'ORG', 'GPE']: named_entities.append(entity.text)
Print the extracted named entities
print(named_entities)
Load the API key from the environment file
dotenv_path = './API_KEY.env' (dotenv_path)
Check if GPU is available and set the device accordingly
device = torch.device('cuda:1' if torch.cuda.is_available() else 'cpu')
Initialize the speech engine
speech_engine = pyttsx3.init()
Get the list of available voices
voices = speech_engine.getProperty('voices') for voice in voices: print(voice.id, voice.name)
Set the desired voice
voice_id = "HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Speech\Voices\Tokens\TTS_MS_EN-GB_HAZEL_11.0 Microsoft Hazel Desktop - English (Great Britain)" speech_engine.setProperty('voice', voice_id)
voices = speech_engine.getProperty('voices') for voice in voices: print(voice.id, voice.name)
Set the desired voice
desired_voice = "Microsoft Hazel Desktop - English (Great Britain)" voice_id = None
Find the voice ID based on the desired voice name
for voice in voices: if desired_voice in voice.name: voice_id = voice.id break
if voice_id: speech_engine.setProperty('voice', voice_id) print("Desired voice set successfully.") else: print("Desired voice not found.")
class CommonModule: def init(self, model, name, param1, param2): # Initialize the instance variables using the provided arguments self.model = model self.name = name self.param1 = param1 self.param2 = param2 self.tokenizer = AutoTokenizer.from_pretrained(model) # Load the tokenizer self.tokenizer.add_special_tokens({'pad_token': '[PAD]'}) self.gpt3_model = GPTNeoForCausalLM.from_pretrained('EleutherAI/gpt-neo-1.3B') self.gpt3_model.to(device) # Move model to the device (GPU or CPU) self.memory_module = MemoryModule() self.sentiment_module = SentimentAnalysisModule() self.speech_engine = speech_engine # Assign the initialized speech engine
self.max_sequence_length = 10 # Decrease the value for faster response
self.num_beams = 4 # Reduce the value for faster response
self.no_repeat_ngram_size = 2
self.temperature = 0.3
self.response_cache = {} # Cache for storing frequently occurring responses
def reset_conversation(self):
self.memory_module.reset_memory()
def retrieve_cached_response(self, input_text):
named_entities = self.memory_module.get_named_entities()
for entity in named_entities:
if entity.lower() in input_text.lower():
return self.response_cache.get(entity)
return None
def generate_gpt3_response(self, input_text, conversation_history, temperature=0.3):
prompt = '\n'.join(conversation_history) + '\n' + input_text + '\n'
generator = pipeline('text-generation', model='EleutherAI/gpt-neo-1.3B')
output = generator(
prompt,
do_sample=True,
min_length=50,
temperature=temperature,
num_return_sequences=1
)
if output:
generated_response = output[0]['generated_text'].strip()
return generated_response
return ""
def process_input(self, input_text, conversation_history):
named_entities = list(self.memory_module.get_named_entities())
for entity in named_entities:
if entity in input_text:
response = "Nice to meet you again, {}!".format(entity)
self.memory_module.add_to_memory(response)
return response
# Check if the input contains a question
if '?' in input_text:
return "You're making me angry, you wouldn't like me when I'm angry."
# Check if the input contains a keyword for memory search
if 'search' in input_text.lower():
keyword = input_text.lower().split('search ')[-1]
matches = self.memory_module.search_memory(keyword)
if matches:
return "I found some related information in the memory:\n" + '\n'.join(matches)
else:
return "Sorry, I couldn't find any relevant information in the memory."
# Retrieve the cached response
response = self.retrieve_cached_response(input_text)
if response is None:
response = self.generate_gpt3_response(input_text, conversation_history)
self.cache_response(input_text, response)
named_entities = self.memory_module.get_named_entities()
if named_entities and any(entity in input_text for entity in named_entities):
response = "Nice to meet you, {}! I'm still {}".format(named_entities[0], self.name)
self.memory_module.add_to_memory(response)
return response
self.memory_module.add_to_memory(response)
return response
def cache_response(self, input_text, response):
self.response_cache[input_text] = response
def speak(self, text, conversation_history=None):
if conversation_history is None:
conversation_history = []
conversation_history.append(text)
full_text = "\n".join(conversation_history)
print(text)
self.speech_engine.say(text)
self.speech_engine.runAndWait()
def listen(self):
recognizer = sr.Recognizer()
with sr.Microphone() as source:
print("Listening...")
audio = recognizer.listen(source)
try:
user_input = recognizer.recognize_google(audio)
print("You said:", user_input)
return user_input
except sr.UnknownValueError:
print("Sorry, I could not understand your speech.")
except sr.RequestError as e:
print("Sorry, an error occurred while processing your request. Please try again.")
return ""
def converse(self):
self.reset_conversation()
self.speak("Hey, what's up bro? I'm {}".format(self.name))
conversation_history = []
while True:
user_input = self.listen()
if user_input:
response = self.process_input(user_input, conversation_history)
self.speak(response, conversation_history)
# Check if the user input contains a named entity (name)
named_entities = self.memory_module.get_named_entities()
if named_entities and any(entity in user_input for entity in named_entities):
self.speak("Nice to meet you, {}! I'm still {}".format(named_entities[0], self.name),
conversation_history)
conversation_history.append(user_input)
# Check if the conversation is over (you can define your own condition here)
if user_input == "bye":
self.save_memory('C:\\Users\\withe\PycharmProjects\\no hope\\Chat_Bot_Main\\save_memory.json')
break
def save_memory(self, file_path):
data = {
'memory': self.memory_module.memory,
'named_entities': list(self.memory_module.named_entities) # Convert set to list
}
with open(file_path, 'w') as file:
json.dump(data, file)
def load_memory_data(self, memory_data):
self.memory_module.memory = memory_data['memory']
self.memory_module.named_entities = set(memory_data['named_entities'])
class MemoryModule: def init(self): self.memory = [] self.vectorizer = TfidfVectorizer(stop_words=stopwords.words('english')) self.lemmatizer = WordNetLemmatizer() self.tokenizer = nltk.tokenize.word_tokenize self.named_entities = set() # Set to store named entities like names
def get_named_entities(self):
return self.named_entities
def preprocess_text(self, text):
tokens = self.tokenizer(text.lower())
tokens = [self.lemmatizer.lemmatize(token) for token in tokens if token.isalnum()]
preprocessed_text = ' '.join(tokens)
return preprocessed_text
def add_to_memory(self, text):
preprocessed_text = self.preprocess_text(text)
self.memory.append(preprocessed_text)
# Update named entities if any
named_entity = self.extract_named_entity(text)
if named_entity:
self.named_entities.add(named_entity)
def extract_named_entity(self, text):
doc = nlp(text)
for entity in doc.ents:
if entity.label_ in ['PERSON', 'ORG', 'GPE']:
return entity.text
return None
def search_memory(self, keyword):
preprocessed_keyword = self.preprocess_text(keyword)
vectorized_memory = self.vectorizer.transform(self.memory)
vectorized_keyword = self.vectorizer.transform([preprocessed_keyword])
similarity_scores = np.dot(vectorized_memory, vectorized_keyword.T).toarray().flatten()
sorted_indices = np.argsort(similarity_scores)[::-1]
matches = [self.memory[i] for i in sorted_indices if similarity_scores[i] > 0.5]
return matches
def reset_memory(self):
self.memory = []
self.named_entities = set()
class SentimentAnalysisModule: def init(self): self.analyzer = SentimentIntensityAnalyzer()
def analyze_sentiment(self, text):
sentiment_scores = self.analyzer.polarity_scores(text)
return sentiment_scores
def get_sentiment_label(self, sentiment_scores):
compound_score = sentiment_scores['compound']
if compound_score >= 0.05:
return 'positive'
elif compound_score <= -0.05:
return 'negative'
else:
return 'neutral'
Define an exit handler function
def exit_handler(common_module): memory_data = { 'memory': common_module.memory_module.memory, 'named_entities': list(common_module.memory_module.named_entities) } common_module.save_memory('C:\Users\withe\PycharmProjects\pythonProject2\Chat_bot1\save_memory.json') print("Memory data saved successfully.")
return memory_data
Define a method to check if the load_memory.json file exists
def check_memory_file(file_path): return os.path.isfile(file_path)
if name == "main": model = 'EleutherAI/gpt-neo-1.3B' name = "Chat bot1" param1 = 'value1' param2 = 'value2' common_module = CommonModule(model, name, param1, param2)
memory_file_path = 'C:\\Users\\withe\\PycharmProjects\\pythonProject2\\Chat_bot1\\load_memory1.json'
if check_memory_file(memory_file_path):
with open(memory_file_path, 'r') as file:
memory_data = json.load(file)
common_module.load_memory_data(memory_data)
atexit.register(exit_handler, common_module)
common_module.converse()
# Generate response using the generator
prompt = "EleutherAI has"
generated_text = Generator(prompt, do_sample=True, min_length=50)
if generated_text:
generated_response = generated_text[0]['generated_text'].strip()
print(generated_response)
common_module.save_memory(memory_file_path)