import atexit import json import os import sys from collections.abc import Generator import nltk import numpy as np import pyttsx3 import spacy import spacy as nlp import speech_recognition as sr import transformers from nltk.corpus import stopwords from nltk.stem import WordNetLemmatizer from sklearn.feature_extraction.text import TfidfVectorizer from transformers import AutoTokenizer from transformers import GPTNeoForCausalLM from transformers import pipeline from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer import torch print(torch.__version__) model_path = spacy.util.get_package_path('en_core_web_sm') print(model_path) print("transformers version:", transformers.__version__) print("spacy version:", spacy.__version__) print("nltk version:", nltk.__version__) sys.path.append(r"C:\Users\withe\PycharmProjects\no hope2\Gpt-Neo1") # Download necessary NLTK resources nltk.download('punkt') nltk.download('stopwords') nltk.download('wordnet') nltk.download('omw-1.4') # Load the spaCy model nlp = spacy.load('en_core_web_sm') # Define a text input text = "Example text to process" # Process the text using the nlp object doc = nlp(text) # Extract named entities from the processed text named_entities = [] for entity in doc.ents: if entity.label_ in ['PERSON', 'ORG', 'GPE']: named_entities.append(entity.text) # Print the extracted named entities print(named_entities) # Load the API key from the environment file dotenv_path = './API_KEY.env' (dotenv_path) # Check if GPU is available and set the device accordingly device = torch.device('cuda:1' if torch.cuda.is_available() else 'cpu') # Initialize the speech engine speech_engine = pyttsx3.init() # Get the list of available voices voices = speech_engine.getProperty('voices') for voice in voices: print(voice.id, voice.name) # Set the desired voice voice_id = "HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Speech\Voices\Tokens\TTS_MS_EN-GB_HAZEL_11.0 Microsoft Hazel Desktop - English (Great Britain)" speech_engine.setProperty('voice', voice_id) voices = speech_engine.getProperty('voices') for voice in voices: print(voice.id, voice.name) # Set the desired voice desired_voice = "Microsoft Hazel Desktop - English (Great Britain)" voice_id = None # Find the voice ID based on the desired voice name for voice in voices: if desired_voice in voice.name: voice_id = voice.id break if voice_id: speech_engine.setProperty('voice', voice_id) print("Desired voice set successfully.") else: print("Desired voice not found.") class CommonModule: def __init__(self, model, name, param1, param2): # Initialize the instance variables using the provided arguments self.model = model self.name = name self.param1 = param1 self.param2 = param2 self.tokenizer = AutoTokenizer.from_pretrained(model) # Load the tokenizer self.tokenizer.add_special_tokens({'pad_token': '[PAD]'}) self.gpt3_model = GPTNeoForCausalLM.from_pretrained('EleutherAI/gpt-neo-1.3B') self.gpt3_model.to(device) # Move model to the device (GPU or CPU) self.memory_module = MemoryModule() self.sentiment_module = SentimentAnalysisModule() self.speech_engine = speech_engine # Assign the initialized speech engine self.max_sequence_length = 10 # Decrease the value for faster response self.num_beams = 4 # Reduce the value for faster response self.no_repeat_ngram_size = 2 self.temperature = 0.3 self.response_cache = {} # Cache for storing frequently occurring responses def reset_conversation(self): self.memory_module.reset_memory() def retrieve_cached_response(self, input_text): named_entities = self.memory_module.get_named_entities() for entity in named_entities: if entity.lower() in input_text.lower(): return self.response_cache.get(entity) return None def generate_gpt3_response(self, input_text, conversation_history, temperature=0.3): prompt = '\n'.join(conversation_history) + '\n' + input_text + '\n' generator = pipeline('text-generation', model='EleutherAI/gpt-neo-1.3B') output = generator( prompt, do_sample=True, min_length=50, temperature=temperature, num_return_sequences=1 ) if output: generated_response = output[0]['generated_text'].strip() return generated_response return "" def process_input(self, input_text, conversation_history): named_entities = list(self.memory_module.get_named_entities()) for entity in named_entities: if entity in input_text: response = "Nice to meet you again, {}!".format(entity) self.memory_module.add_to_memory(response) return response # Check if the input contains a question if '?' in input_text: return "You're making me angry, you wouldn't like me when I'm angry." # Check if the input contains a keyword for memory search if 'search' in input_text.lower(): keyword = input_text.lower().split('search ')[-1] matches = self.memory_module.search_memory(keyword) if matches: return "I found some related information in the memory:\n" + '\n'.join(matches) else: return "Sorry, I couldn't find any relevant information in the memory." # Retrieve the cached response response = self.retrieve_cached_response(input_text) if response is None: response = self.generate_gpt3_response(input_text, conversation_history) self.cache_response(input_text, response) named_entities = self.memory_module.get_named_entities() if named_entities and any(entity in input_text for entity in named_entities): response = "Nice to meet you, {}! I'm still {}".format(named_entities[0], self.name) self.memory_module.add_to_memory(response) return response self.memory_module.add_to_memory(response) return response def cache_response(self, input_text, response): self.response_cache[input_text] = response def speak(self, text, conversation_history=None): if conversation_history is None: conversation_history = [] conversation_history.append(text) full_text = "\n".join(conversation_history) print(text) self.speech_engine.say(text) self.speech_engine.runAndWait() def listen(self): recognizer = sr.Recognizer() with sr.Microphone() as source: print("Listening...") audio = recognizer.listen(source) try: user_input = recognizer.recognize_google(audio) print("You said:", user_input) return user_input except sr.UnknownValueError: print("Sorry, I could not understand your speech.") except sr.RequestError as e: print("Sorry, an error occurred while processing your request. Please try again.") return "" def converse(self): self.reset_conversation() self.speak("Hey, what's up bro? I'm {}".format(self.name)) conversation_history = [] while True: user_input = self.listen() if user_input: response = self.process_input(user_input, conversation_history) self.speak(response, conversation_history) # Check if the user input contains a named entity (name) named_entities = self.memory_module.get_named_entities() if named_entities and any(entity in user_input for entity in named_entities): self.speak("Nice to meet you, {}! I'm still {}".format(named_entities[0], self.name), conversation_history) conversation_history.append(user_input) # Check if the conversation is over (you can define your own condition here) if user_input == "bye": self.save_memory('C:\\Users\\withe\PycharmProjects\\no hope\\Chat_Bot_Main\\save_memory.json') break def save_memory(self, file_path): data = { 'memory': self.memory_module.memory, 'named_entities': list(self.memory_module.named_entities) # Convert set to list } with open(file_path, 'w') as file: json.dump(data, file) def load_memory_data(self, memory_data): self.memory_module.memory = memory_data['memory'] self.memory_module.named_entities = set(memory_data['named_entities']) class MemoryModule: def __init__(self): self.memory = [] self.vectorizer = TfidfVectorizer(stop_words=stopwords.words('english')) self.lemmatizer = WordNetLemmatizer() self.tokenizer = nltk.tokenize.word_tokenize self.named_entities = set() # Set to store named entities like names def get_named_entities(self): return self.named_entities def preprocess_text(self, text): tokens = self.tokenizer(text.lower()) tokens = [self.lemmatizer.lemmatize(token) for token in tokens if token.isalnum()] preprocessed_text = ' '.join(tokens) return preprocessed_text def add_to_memory(self, text): preprocessed_text = self.preprocess_text(text) self.memory.append(preprocessed_text) # Update named entities if any named_entity = self.extract_named_entity(text) if named_entity: self.named_entities.add(named_entity) def extract_named_entity(self, text): doc = nlp(text) for entity in doc.ents: if entity.label_ in ['PERSON', 'ORG', 'GPE']: return entity.text return None def search_memory(self, keyword): preprocessed_keyword = self.preprocess_text(keyword) vectorized_memory = self.vectorizer.transform(self.memory) vectorized_keyword = self.vectorizer.transform([preprocessed_keyword]) similarity_scores = np.dot(vectorized_memory, vectorized_keyword.T).toarray().flatten() sorted_indices = np.argsort(similarity_scores)[::-1] matches = [self.memory[i] for i in sorted_indices if similarity_scores[i] > 0.5] return matches def reset_memory(self): self.memory = [] self.named_entities = set() class SentimentAnalysisModule: def __init__(self): self.analyzer = SentimentIntensityAnalyzer() def analyze_sentiment(self, text): sentiment_scores = self.analyzer.polarity_scores(text) return sentiment_scores def get_sentiment_label(self, sentiment_scores): compound_score = sentiment_scores['compound'] if compound_score >= 0.05: return 'positive' elif compound_score <= -0.05: return 'negative' else: return 'neutral' # Define an exit handler function def exit_handler(common_module): memory_data = { 'memory': common_module.memory_module.memory, 'named_entities': list(common_module.memory_module.named_entities) } common_module.save_memory('C:\\Users\\withe\\PycharmProjects\\pythonProject2\\Chat_bot1\\save_memory.json') print("Memory data saved successfully.") return memory_data # Define a method to check if the load_memory.json file exists def check_memory_file(file_path): return os.path.isfile(file_path) if __name__ == "__main__": model = 'EleutherAI/gpt-neo-1.3B' name = "Chat bot1" param1 = 'value1' param2 = 'value2' common_module = CommonModule(model, name, param1, param2) memory_file_path = 'C:\\Users\\withe\\PycharmProjects\\pythonProject2\\Chat_bot1\\load_memory1.json' if check_memory_file(memory_file_path): with open(memory_file_path, 'r') as file: memory_data = json.load(file) common_module.load_memory_data(memory_data) atexit.register(exit_handler, common_module) common_module.converse() # Generate response using the generator prompt = "EleutherAI has" generated_text = Generator(prompt, do_sample=True, min_length=50) if generated_text: generated_response = generated_text[0]['generated_text'].strip() print(generated_response) common_module.save_memory(memory_file_path)