# This is an quote and post library for a specific thread in the WarOnline forum. import WarClient import conversationDB import requests import re from bs4 import BeautifulSoup import urllib.request as urllib import warnings import time import config # Here the constants are stored warnings.filterwarnings("ignore") # Start a session to persist the login cookie across requests session = requests.Session() def fixString(S): # This is a helper function to overcome the bugs of tokenizer S = S.replace(",+", ",") S = S.replace("!.", "!") S = S.replace(".?", "?") S = S.replace(",,", ",") S = S.replace("?.", "?") S = S.replace("??", "?") S = S.replace(" ?", "?") S = S.replace(" .", ".") S = S.replace(",!", "!") S = S.replace(",.", ",") S = S.replace(".]", ".") S = S.replace(",\)", ")") S = S.replace("&", "") S = S.replace("&", "") S = S.replace("ен,ицхак", "ен-ицхак") S = S.replace("СШа", "США") S = S.replace("(,", "(") S = S.replace("?.", "?") S = S.replace("#", "") S = S.replace("()", "") S = S.strip(',') S = S.strip() return S def compare_pages(url1, url2): #Compares 2 pages and returns True if they are the same return urllib.urlopen(url1).geturl() == urllib.urlopen(url2).geturl() def remove_non_english_russian_chars(s): # Regular expression to match all characters that are not in English or Russian pattern = '[^A-Za-zА-Яа-яЁё(),.!?"\s-]' # Replace all matched characters with an empty string return re.sub(pattern, '', s) def remove_extra_spaces(s): # Removes extra whitespaces and unwanted characters s = re.sub(r"\s+", " ", s) # replace all sequences of whitespace with a single space s = re.sub(r"\s+([.,-])", r"\1", s) # remove spaces before period, dash or comma return(s) def getLastPage(thread_url=config.thread_url): # Returns the number of the last page print('looking for the last page of the thread') page = 1 # Starting page lastPage = False while not lastPage: if not compare_pages(thread_url + 'page-' + str(page), thread_url + 'page-' + str(page + 1)): page += 1 else: lastPage = True print('Last page of this thread is '+str(page)) return page def getLastPage2(thread_url=config.thread_url): # Returns the number of the last page in faster manner print('looking for the last page of the thread') response = requests.get(thread_url) if response.status_code == 200: html_content = response.content # Parse the HTML content using BeautifulSoup soup = BeautifulSoup(html_content, 'html.parser') # Find last page number data = soup.find_all('li', {'class': 'pageNav-page'})[-1] page = int(data.getText()) print('Last page of this thread is ' + str(page)) return page def login(username=config.username, password=config.password, thread_url=config.thread_url): # Log-In to the forum and redirect to thread # Retrieve the login page HTML to get the CSRF token login_page_response = session.get(config.login_url) soup = BeautifulSoup(login_page_response.text, 'html.parser') csrf_token = soup.find('input', {'name': '_xfToken'})['value'] # Login to the website login_data = { 'login': username, 'password': password, 'remember': '1', '_xfRedirect': thread_url, '_xfToken': csrf_token } response = session.post(config.login_url, data=login_data) # Check if the login was successful if 'Invalid login' in response.text: print('Login failed!') exit() else: print('Login successful') def post(message="", thread_url=config.thread_url, post_url=config.post_url, quoted_by="",quote_text="",quote_source="",img_url=""): #Post a message to the forum (with or without the quote #quote_source is in format 'post-3920992' quote_source = quote_source.split('-')[-1] # Take the numbers only if quoted_by: if img_url: # It is an image message = f'Примерно вот так: \n[IMG]{img_url}[/IMG]' # Set the image block message = f'[QUOTE="{quoted_by}, post: {quote_source}"]{quote_text}[/QUOTE]{message}' # Retrieve the thread page HTML response = session.get(thread_url) # Parse the HTML with BeautifulSoup soup = BeautifulSoup(response.text, 'html.parser') # Extract the _xfToken value from the hidden form field xf_token = soup.find('input', {'name': '_xfToken'}).get('value') # Construct the message data for the POST request message_data = { '_xfToken': xf_token, 'message': message, 'attachment_hash': '', 'last_date': '', '_xfRequestUri': post_url, '_xfWithData': '1', '_xfResponseType': 'json' } response = session.post(post_url, data=message_data) # Check if the post was successful if not response.ok: print('Post failed!') exit() print('Post submitted successfully.') def getMessages(thread_url=config.thread_url, quotedUser="", startingPage=1): # Returns all the quotes for #username in the specific multi-page thread url allquotes =[] page = startingPage # Counter lastPage = False # Initial values for messangerName and the message ID messengerName = "" messageID = "" quotedID = "" # Patterns to search in the last quote. namePattern = re.compile('data-lb-caption-desc="(.*?) ·') messageIDPattern = re.compile('data-lb-id="(.*?)"') quotedIDPattern = re.compile('data-source="(.*?)"') quotedNamePattern = re.compile('data-quote="(.*?)"') while not lastPage: response = requests.get(thread_url + 'page-' + str(page)) if response.status_code == 200: # Core of the function html_content = response.content # Parse the HTML content using BeautifulSoup soup = BeautifulSoup(html_content, 'html.parser') # Find all the message in the thread page messageData = soup.find_all('div', {'class': 'message-userContent lbContainer js-lbContainer'}) for data in messageData: try: # Get the messager username matchName = namePattern.search(str(data)) if matchName: messengerName = matchName.group(1) # Get the quoted ID matchID = quotedIDPattern.search(str(data)) if matchID: quotedID = matchID.group(1) # Get the message ID matchID = messageIDPattern.search(str(data)) if matchID: messageID = matchID.group(1) # Match the QuotedName matchQuotedName = quotedNamePattern.search(str(data)) if matchQuotedName: quotedName = matchQuotedName.group(1) if quotedUser and (quotedUser != quotedName): continue # Make sure that the messages have a quote inside blockquote = data.find('blockquote') if blockquote: # Extract the text text = data.find('div', {'class': 'bbWrapper'}) for bq in text.find_all('blockquote'): bq.extract() reply = text.get_text().replace('\n', ' ').strip() allquotes.append({'reply': reply, 'messengerName': messengerName, 'messageID': messageID, 'quotedID': quotedID}) else: # Looking for a direct message "@WarBot" text = data.find('div', {'class': 'bbWrapper'}) if text.get_text().startswith('@WarBot'): reply = text.get_text().replace('@WarBot','').replace('\n', ' ').strip() allquotes.append({'reply': reply, 'messengerName': messengerName, 'messageID': messageID, 'quotedID': 'post: 0'}) except: continue # There was no text in this quote, move to the next #check if that is not a last page if not compare_pages(thread_url + 'page-' + str(page), thread_url + 'page-' + str(page + 1)): page += 1 else: lastPage = True else: lastPage = True return allquotes def WarOnlineBot(): # Core Engine of the Client try: # Try logging in login(username=config.username, password=config.password, thread_url=config.thread_url) lookUpPages = 5 # How many pages back to look in the thread startingPage = getLastPage2(thread_url=config.thread_url) - lookUpPages if startingPage < 1: startingPage = 1 # Starting page cannot be less than 1 # All messages (with quotes) by ALL users: allMessages = getMessages(thread_url=config.thread_url, quotedUser='', startingPage=startingPage) # IDs of the quoted messages, replied by the bot: messages_by_bot_IDs = [] # Initiate the direct messages direct_messages = [] for msg in allMessages: # Direct message to the bot if msg['quotedID'].split(': ')[-1] == '0': #debug direct_messages.append(msg) # Set a list of replied messages IDs if msg['messengerName'] == config.username: #message posted by the WarBot messages_by_bot_IDs.append(msg['quotedID'].split(': ')[-1]) # remove empty and repeated elements messages_by_bot_IDs = list(set([elem for elem in messages_by_bot_IDs if elem])) # All messages (with quotes) sent _FOR_ the Bot: messagesForBot = getMessages(thread_url=config.thread_url, quotedUser=config.username, startingPage=startingPage) # Append the direct messages to the messagesForBot: for msg in direct_messages: messagesForBot.append(msg) # IDs of the messages, quoting the bot: messages_for_bot_IDs = [] for msg in messagesForBot: # Set a list of posted message IDs messages_for_bot_IDs.append(msg['messageID'].split('-')[-1]) # remove empty elements messages_for_bot_IDs = [elem for elem in messages_for_bot_IDs if elem] # Filter to leave just the unanswered messages IDs: messages_for_bot_IDs = [ID for ID in messages_for_bot_IDs if ID not in messages_by_bot_IDs] # Reply the unanswered messages: for msg in messagesForBot: if msg['messageID'].split('-')[-1] in messages_for_bot_IDs: originalQuote = msg['reply'] if originalQuote == "": # Just images, no text continue else: quote = remove_non_english_russian_chars(msg['reply']) quote = remove_extra_spaces(quote) message = "" #Initiating the reply message by Bot previous_dialogue = "" #Initiating the previous dialogue print('Quote: ', originalQuote) # Init Connection db = conversationDB.DataBase() if msg['quotedID'].split(': ')[-1] != '0': # It is dialogue. Look-up for the previous quotes # Get the previous dialogue from the database dbmessages = db.getmessages(msg['messengerName']) for dbmessage in dbmessages: previous_dialogue += dbmessage[0]+' '+dbmessage[1]+' ' # Update the string and preprocess it quote = previous_dialogue + quote quote = remove_non_english_russian_chars(quote) quote = remove_extra_spaces(quote) # Truncate the quote to return only the last MaxWords of words: quote = " ".join(quote.split()[-config.MaxWords:]) # Fix the quote string, to eliminate errors: quote = fixString(quote) FailureCounter = 0 # In case there is a bug in the model while (not message) and (FailureCounter<3): message = WarClient.getReply(message=quote) # Strange error in message if there is '02' in the message text. if '02' in message: message = "" FailureCounter+=1 if FailureCounter == 3: continue # Skip that answer # Post-processing fixes: message = fixString(message) print('Reply: ', message) if message.endswith('.png'): # It is an image reply: # Post an image reply: login(username=config.username, password=config.password, thread_url=config.thread_url) time.sleep(1) post(message="", thread_url=config.thread_url, post_url=config.post_url, quoted_by=msg['messengerName'], quote_text=originalQuote, quote_source=msg['messageID'], img_url=message) # will not be added to the database, if image is a reply else: # Add the new conversation pair to the database db.setmessages(username=msg['messengerName'], message_text=originalQuote, bot_reply=message) # Clean up the excessive records, leaving only the remaining messages db.cleanup(username=msg['messengerName'], remaining_messages=config.remaining_messages) # Delete the duplicate records db.deleteDuplicates() login(username=config.username, password=config.password, thread_url=config.thread_url) time.sleep(1) post(message=message, thread_url=config.thread_url, post_url=config.post_url, quoted_by=msg['messengerName'], quote_text=originalQuote, quote_source=msg['messageID']) time.sleep(10) # Standby time for server load release return 0 # All is good except: print('Bad Connection') return -1 # Error if __name__ == '__main__': # Start the scheduler while True: print('Starting Session') result = WarOnlineBot() # Debug Only: #imgWord = 'как выглядит' """ login(username=config.username, password=config.password, thread_url=config.thread_url) print("logged in") post(message="", thread_url=config.thread_url, post_url=config.post_url, quoted_by='Test', quote_text="posting an image",img_url='https://replicate.delivery/pbxt/knKBiJt8DPZ0B1o25PaLJSZjgv3D5HcwLoBIn0JESbe3nISIA/out-0.png') """ if result == 0: # Good result print('Session finished. Timeout...') timer = range(60 * config.timeout) for t in timer: time.sleep(1) else: # Rerunning Session time.sleep(10) # Standby time for server load release