File size: 14,832 Bytes
10a95dd dd2dd36 a36bc66 71acbff dd2dd36 00d90ae dd2dd36 3291ddb d7e9769 71acbff 3152d9e 71acbff dd2dd36 cf8e5a9 28a6a88 4e54373 5f576b1 4e54373 71acbff 4e54373 4c41d5e 4e54373 4c41d5e cf8e5a9 3291ddb ef02423 4c41d5e ef02423 4c41d5e 92b9c00 4c41d5e 8f30cba 4c41d5e 71acbff 4c41d5e 92b9c00 4c41d5e 92b9c00 4c41d5e 71acbff 10a95dd 71acbff 10a95dd 71acbff 10a95dd 92b9c00 10a95dd 8f30cba 3291ddb c981d80 3291ddb 8f30cba 3291ddb 10a95dd 71acbff 4c3086f 00d90ae 4c3086f 10a95dd 00d90ae ef02423 00d90ae 4c3086f ef02423 10a95dd 4c3086f 10a95dd 905f4ca 4c3086f 10a95dd 4c3086f ef02423 00d90ae 4c3086f ef02423 71acbff ef02423 768ac34 ef02423 768ac34 ef02423 8aafba3 4c3086f 10a95dd 4c3086f 10a95dd d7e9769 92b9c00 905f4ca 92b9c00 ef02423 92b9c00 d7e9769 71acbff 92b9c00 8f30cba 92b9c00 905f4ca 92b9c00 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 |
# This is an quote and post library for a specific thread in the WarOnline forum.
import WarClient
import conversationDB
import requests
import re
from bs4 import BeautifulSoup
import urllib.request as urllib
import warnings
import time
import config # Here the constants are stored
warnings.filterwarnings("ignore")
# Start a session to persist the login cookie across requests
session = requests.Session()
def fixString(S):
# This is a helper function to overcome the bugs of tokenizer
S = S.replace(",+", ",")
S = S.replace("!.", "!")
S = S.replace(".?", "?")
S = S.replace(",,", ",")
S = S.replace("?.", "?")
S = S.replace("??", "?")
S = S.replace(" ?", "?")
S = S.replace(" .", ".")
S = S.replace(",!", "!")
S = S.replace(",.", ",")
S = S.replace(".]", ".")
S = S.replace(",\)", ")")
S = S.replace("&", "")
S = S.replace("&", "")
S = S.replace("ен,ицхак", "ен-ицхак")
S = S.replace("СШа", "США")
S = S.replace("(,", "(")
S = S.replace("?.", "?")
S = S.replace("#", "")
S = S.replace("()", "")
S = S.strip(',')
S = S.strip()
return S
def compare_pages(url1, url2):
#Compares 2 pages and returns True if they are the same
return urllib.urlopen(url1).geturl() == urllib.urlopen(url2).geturl()
def remove_non_english_russian_chars(s):
# Regular expression to match all characters that are not in English or Russian
pattern = '[^A-Za-zА-Яа-яЁё(),.!?"\s-]'
# Replace all matched characters with an empty string
return re.sub(pattern, '', s)
def remove_extra_spaces(s):
# Removes extra whitespaces and unwanted characters
s = re.sub(r"\s+", " ", s) # replace all sequences of whitespace with a single space
s = re.sub(r"\s+([.,-])", r"\1", s) # remove spaces before period, dash or comma
return(s)
def getLastPage(thread_url=config.thread_url):
# Returns the number of the last page
print('looking for the last page of the thread')
page = 1 # Starting page
lastPage = False
while not lastPage:
if not compare_pages(thread_url + 'page-' + str(page), thread_url + 'page-' + str(page + 1)):
page += 1
else:
lastPage = True
print('Last page of this thread is '+str(page))
return page
def login(username=config.username, password=config.password, thread_url=config.thread_url):
# Log-In to the forum and redirect to thread
# Retrieve the login page HTML to get the CSRF token
login_page_response = session.get(config.login_url)
soup = BeautifulSoup(login_page_response.text, 'html.parser')
csrf_token = soup.find('input', {'name': '_xfToken'})['value']
# Login to the website
login_data = {
'login': username,
'password': password,
'remember': '1',
'_xfRedirect': thread_url,
'_xfToken': csrf_token
}
response = session.post(config.login_url, data=login_data)
# Check if the login was successful
if 'Invalid login' in response.text:
print('Login failed!')
exit()
else:
print('Login successful')
def post(message="", thread_url=config.thread_url, post_url=config.post_url, quoted_by="",quote_text="",quote_source="",img_url=""):
#Post a message to the forum (with or without the quote
#quote_source is in format 'post-3920992'
quote_source = quote_source.split('-')[-1] # Take the numbers only
if quoted_by:
if img_url: # It is an image
message = f'Примерно вот так: \n[IMG]{img_url}[/IMG]' # Set the image block
message = f'[QUOTE="{quoted_by}, post: {quote_source}"]{quote_text}[/QUOTE]{message}'
# Retrieve the thread page HTML
response = session.get(thread_url)
# Parse the HTML with BeautifulSoup
soup = BeautifulSoup(response.text, 'html.parser')
# Extract the _xfToken value from the hidden form field
xf_token = soup.find('input', {'name': '_xfToken'}).get('value')
# Construct the message data for the POST request
message_data = {
'_xfToken': xf_token,
'message': message,
'attachment_hash': '',
'last_date': '',
'_xfRequestUri': post_url,
'_xfWithData': '1',
'_xfResponseType': 'json'
}
response = session.post(post_url, data=message_data)
# Check if the post was successful
if not response.ok:
print('Post failed!')
exit()
print('Post submitted successfully.')
def getMessages(thread_url=config.thread_url, quotedUser="", startingPage=1):
# Returns all the quotes for #username in the specific multi-page thread url
allquotes =[]
page = startingPage # Counter
lastPage = False
# Initial values for messangerName and the message ID
messengerName = ""
messageID = ""
quotedID = ""
# Patterns to search in the last quote.
namePattern = re.compile('data-lb-caption-desc="(.*?) ·')
messageIDPattern = re.compile('data-lb-id="(.*?)"')
quotedIDPattern = re.compile('data-source="(.*?)"')
quotedNamePattern = re.compile('data-quote="(.*?)"')
while not lastPage:
response = requests.get(thread_url + 'page-' + str(page))
if response.status_code == 200:
# Core of the function
html_content = response.content
# Parse the HTML content using BeautifulSoup
soup = BeautifulSoup(html_content, 'html.parser')
# Find all the message in the thread page
messageData = soup.find_all('div', {'class': 'message-userContent lbContainer js-lbContainer'})
for data in messageData:
try:
# Get the messager username
matchName = namePattern.search(str(data))
if matchName:
messengerName = matchName.group(1)
# Get the quoted ID
matchID = quotedIDPattern.search(str(data))
if matchID:
quotedID = matchID.group(1)
# Get the message ID
matchID = messageIDPattern.search(str(data))
if matchID:
messageID = matchID.group(1)
# Match the QuotedName
matchQuotedName = quotedNamePattern.search(str(data))
if matchQuotedName:
quotedName = matchQuotedName.group(1)
if quotedUser and (quotedUser != quotedName):
continue
# Make sure that the messages have a quote inside
blockquote = data.find('blockquote')
if blockquote:
# Extract the text
text = data.find('div', {'class': 'bbWrapper'})
for bq in text.find_all('blockquote'):
bq.extract()
reply = text.get_text().replace('\n', ' ').strip()
allquotes.append({'reply': reply, 'messengerName': messengerName, 'messageID': messageID, 'quotedID': quotedID})
else: # Looking for a direct message "@WarBot"
text = data.find('div', {'class': 'bbWrapper'})
if text.get_text().startswith('@WarBot'):
reply = text.get_text().replace('@WarBot','').replace('\n', ' ').strip()
allquotes.append({'reply': reply, 'messengerName': messengerName, 'messageID': messageID, 'quotedID': 'post: 0'})
except:
continue # There was no text in this quote, move to the next
#check if that is not a last page
if not compare_pages(thread_url + 'page-' + str(page), thread_url + 'page-' + str(page + 1)):
page += 1
else:
lastPage = True
else:
lastPage = True
return allquotes
def WarOnlineBot():
# Core Engine of the Client
try: # Try logging in
login(username=config.username, password=config.password, thread_url=config.thread_url)
lookUpPages = 5 # How many pages back to look in the thread
startingPage = getLastPage(thread_url=config.thread_url) - lookUpPages
if startingPage < 1:
startingPage = 1 # Starting page cannot be less than 1
# All messages (with quotes) by ALL users:
allMessages = getMessages(thread_url=config.thread_url, quotedUser='', startingPage=startingPage)
# IDs of the quoted messages, replied by the bot:
messages_by_bot_IDs = []
# Initiate the direct messages
direct_messages = []
for msg in allMessages:
# Direct message to the bot
if msg['quotedID'].split(': ')[-1] == '0': #debug
direct_messages.append(msg)
# Set a list of replied messages IDs
if msg['messengerName'] == config.username: #message posted by the WarBot
messages_by_bot_IDs.append(msg['quotedID'].split(': ')[-1])
# remove empty and repeated elements
messages_by_bot_IDs = list(set([elem for elem in messages_by_bot_IDs if elem]))
# All messages (with quotes) sent _FOR_ the Bot:
messagesForBot = getMessages(thread_url=config.thread_url, quotedUser=config.username, startingPage=startingPage)
# Append the direct messages to the messagesForBot:
for msg in direct_messages:
messagesForBot.append(msg)
# IDs of the messages, quoting the bot:
messages_for_bot_IDs = []
for msg in messagesForBot:
# Set a list of posted message IDs
messages_for_bot_IDs.append(msg['messageID'].split('-')[-1])
# remove empty elements
messages_for_bot_IDs = [elem for elem in messages_for_bot_IDs if elem]
# Filter to leave just the unanswered messages IDs:
messages_for_bot_IDs = [ID for ID in messages_for_bot_IDs if ID not in messages_by_bot_IDs]
# Reply the unanswered messages:
for msg in messagesForBot:
if msg['messageID'].split('-')[-1] in messages_for_bot_IDs:
originalQuote = msg['reply']
if originalQuote == "": # Just images, no text
continue
else:
quote = remove_non_english_russian_chars(msg['reply'])
quote = remove_extra_spaces(quote)
message = "" #Initiating the reply message by Bot
previous_dialogue = "" #Initiating the previous dialogue
print('Quote: ', originalQuote)
# Init Connection
db = conversationDB.DataBase()
if msg['quotedID'].split(': ')[-1] != '0': # It is dialogue. Look-up for the previous quotes
# Get the previous dialogue from the database
dbmessages = db.getmessages(msg['messengerName'])
for dbmessage in dbmessages:
previous_dialogue += dbmessage[0]+' '+dbmessage[1]+' '
# Update the string and preprocess it
quote = previous_dialogue + quote
quote = remove_non_english_russian_chars(quote)
quote = remove_extra_spaces(quote)
# Truncate the quote to return only the last MaxWords of words:
quote = " ".join(quote.split()[-config.MaxWords:])
# Fix the quote string, to eliminate errors:
quote = fixString(quote)
FailureCounter = 0 # In case there is a bug in the model
while (not message) and (FailureCounter<3):
message = WarClient.getReply(message=quote)
# Strange error in message if there is '02' in the message text.
if '02' in message:
message = ""
FailureCounter+=1
if FailureCounter == 3:
continue # Skip that answer
# Post-processing fixes:
message = fixString(message)
print('Reply: ', message)
if message.endswith('.png'): # It is an image reply:
# Post an image reply:
login(username=config.username, password=config.password, thread_url=config.thread_url)
time.sleep(1)
post(message="", thread_url=config.thread_url, post_url=config.post_url, quoted_by=msg['messengerName'],
quote_text=originalQuote, quote_source=msg['messageID'],
img_url=message)
# will not be added to the database, if image is a reply
else:
# Add the new conversation pair to the database
db.setmessages(username=msg['messengerName'], message_text=originalQuote, bot_reply=message)
# Clean up the excessive records, leaving only the remaining messages
db.cleanup(username=msg['messengerName'], remaining_messages=config.remaining_messages)
# Delete the duplicate records
db.deleteDuplicates()
login(username=config.username, password=config.password, thread_url=config.thread_url)
time.sleep(1)
post(message=message, thread_url=config.thread_url, post_url=config.post_url, quoted_by=msg['messengerName'], quote_text=originalQuote, quote_source=msg['messageID'])
time.sleep(10) # Standby time for server load release
return 0 # All is good
except:
print('Bad Connection')
return -1 # Error
if __name__ == '__main__':
# Start the scheduler
while True:
print('Starting Session')
result = WarOnlineBot()
# Debug Only:
#imgWord = 'как выглядит'
"""
login(username=config.username, password=config.password, thread_url=config.thread_url)
print("logged in")
post(message="", thread_url=config.thread_url, post_url=config.post_url, quoted_by='Test',
quote_text="posting an image",img_url='https://replicate.delivery/pbxt/knKBiJt8DPZ0B1o25PaLJSZjgv3D5HcwLoBIn0JESbe3nISIA/out-0.png')
"""
if result == 0: # Good result
print('Session finished. Timeout...')
timer = range(60 * config.timeout)
for t in timer:
time.sleep(1)
else:
# Rerunning Session
time.sleep(10) # Standby time for server load release
|