xsigus24's picture
Upload folder using huggingface_hub
1d777c4
raw
history blame
No virus
12.2 kB
import logging
from re import split, sub
from threading import Lock
from time import sleep
from typing import Tuple, Dict
try:
from extensions.telegram_bot.source.generators.abstract_generator import AbstractGenerator
except ImportError:
from source.generators.abstract_generator import AbstractGenerator
try:
import extensions.telegram_bot.source.const as const
import extensions.telegram_bot.source.utils as utils
import extensions.telegram_bot.source.generator as generator
from extensions.telegram_bot.source.user import User as User
from extensions.telegram_bot.source.conf import cfg
from extensions.telegram_bot.source.conf import cfg
except ImportError:
import source.const as const
import source.utils as utils
import source.generator as generator
from source.user import User as User
from source.conf import cfg
# Define generator lock to prevent GPU overloading
generator_lock = Lock()
# Generator obj
debug_flag = True
# ====================================================================================
# TEXT LOGIC
async def aget_answer(text_in: str, user: User, bot_mode: str, generation_params: Dict, name_in="") -> Tuple[str, str]:
return await get_answer(text_in, user, bot_mode, generation_params, name_in)
@utils.async_wrap
def get_answer(text_in: str, user: User, bot_mode: str, generation_params: Dict, name_in=""):
# additional delay option
if cfg.answer_delay > 0:
sleep(cfg.answer_delay)
# if generation will fail, return "fail" answer
answer = const.GENERATOR_FAIL
# default result action - message
return_msg_action = const.MSG_SEND
# if user is default equal to user1
name_in = user.name1 if name_in == "" else name_in
# for regeneration result checking
previous_result = ""
# acquire generator lock if we can
generator_lock.acquire(timeout=cfg.generation_timeout)
# user_input preprocessing
try:
# Preprocessing: actions which return result immediately:
if text_in.startswith(tuple(cfg.permanent_change_name2_prefixes)):
# If user_in starts with perm_prefix - just replace name2
user.name2 = text_in[2:]
return_msg_action = const.MSG_SYSTEM
generator_lock.release()
return "New bot name: " + user.name2, return_msg_action
if text_in.startswith(tuple(cfg.permanent_change_name1_prefixes)):
# If user_in starts with perm_prefix - just replace name2
user.name1 = text_in[2:]
return_msg_action = const.MSG_SYSTEM
generator_lock.release()
return "New user name: " + user.name1, return_msg_action
if text_in.startswith(tuple(cfg.permanent_add_context_prefixes)):
# If user_in starts with perm_prefix - just replace name2
user.context += "\n" + text_in[2:]
return_msg_action = const.MSG_SYSTEM
generator_lock.release()
return "Added to context: " + text_in[2:], return_msg_action
if text_in.startswith(tuple(cfg.replace_prefixes)):
# If user_in starts with replace_prefix - fully replace last message
user.change_last_message(history_out=text_in[1:])
return_msg_action = const.MSG_DEL_LAST
generator_lock.release()
return user.history_last_out, return_msg_action
if text_in == const.GENERATOR_MODE_DEL_WORD:
# If user_in starts with replace_prefix - fully replace last message
# get and change last message
last_message = user.history_last_out
last_word = split(r"\n|\.+ +|: +|! +|\? +|\' +|\" +|; +|\) +|\* +", last_message)[-1]
if len(last_word) == 0 and len(last_message) > 1:
last_word = " "
new_last_message = last_message[: -(len(last_word))]
new_last_message = new_last_message.strip()
if len(new_last_message) == 0:
return_msg_action = const.MSG_NOTHING_TO_DO
else:
user.change_last_message(history_out=new_last_message)
generator_lock.release()
return user.history_last_out, return_msg_action
# Preprocessing: actions which not depends on user input:
if bot_mode in [const.MODE_QUERY]:
user.history = []
# if regenerate - msg_id the same, text and name the same. But history clearing:
if text_in == const.GENERATOR_MODE_REGENERATE:
if str(user.msg_id[-1]) not in user.previous_history:
user.previous_history.update({str(user.msg_id[-1]): []})
user.previous_history[str(user.msg_id[-1])].append(user.history_last_out)
text_in = user.text_in[-1]
name_in = user.name_in[-1]
last_msg_id = user.msg_id[-1]
user.truncate_last_message()
user.msg_id.append(last_msg_id)
# Preprocessing: add user_in/names/whitespaces to history in right order depends on mode:
if bot_mode in [const.MODE_NOTEBOOK]:
# If notebook mode - append to history only user_in, no additional preparing;
user.text_in.append(text_in)
user.history_append("", text_in)
elif text_in == const.GENERATOR_MODE_IMPERSONATE:
# if impersonate - append to history only "name1:", no adding "" history
# line to prevent bug in history sequence, add "name1:" prefix for generation
user.text_in.append(text_in)
user.name_in.append(name_in)
user.history_append("", name_in + ":")
elif text_in == const.GENERATOR_MODE_NEXT:
# if user_in is "" - no user text, it is like continue generation adding "" history line
# to prevent bug in history sequence, add "name2:" prefix for generation
user.text_in.append(text_in)
user.name_in.append(name_in)
user.history_append("", user.name2 + ":")
elif text_in == const.GENERATOR_MODE_CONTINUE:
# if user_in is "" - no user text, it is like continue generation
# adding "" history line to prevent bug in history sequence, add "name2:" prefix for generation
pass
elif text_in.startswith(tuple(cfg.sd_api_prefixes)):
# If user_in starts with prefix - impersonate-like (if you try to get "impersonate view")
# adding "" line to prevent bug in history sequence, user_in is prefix for bot answer
user.msg_id.append(0)
user.text_in.append(text_in)
user.name_in.append(name_in)
if len(text_in) == 1:
user.history_append("", cfg.sd_api_prompt_self)
else:
user.history_append("", cfg.sd_api_prompt_of.replace("OBJECT", text_in[1:].strip()))
return_msg_action = const.MSG_SD_API
elif text_in.startswith(tuple(cfg.impersonate_prefixes)):
# If user_in starts with prefix - impersonate-like (if you try to get "impersonate view")
# adding "" line to prevent bug in history sequence, user_in is prefix for bot answer
user.text_in.append(text_in)
user.name_in.append(text_in[1:])
user.history_append("", text_in[1:] + ":")
else:
# If not notebook/impersonate/continue mode then ordinary chat preparing
# add "name1&2:" to user and bot message (generation from name2 point of view);
user.text_in.append(text_in)
user.name_in.append(name_in)
user.history_append(name_in + ": " + text_in, user.name2 + ":")
except Exception as exception:
generator_lock.release()
logging.error("get_answer (prepare text part) " + str(exception) + str(exception.args))
# Text processing with LLM
try:
# Set eos_token and stopping_strings.
stopping_strings = generation_params["stopping_strings"].copy()
eos_token = generation_params["eos_token"]
if bot_mode in [const.MODE_CHAT, const.MODE_CHAT_R, const.MODE_ADMIN]:
stopping_strings += [
name_in + ":",
user.name1 + ":",
user.name2 + ":",
]
if cfg.bot_prompt_end != "":
stopping_strings.append(cfg.bot_prompt_end)
# adjust context/greeting/example
if user.context.strip().endswith("\n"):
context = f"{user.context.strip()}"
else:
context = f"{user.context.strip()}\n"
context = cfg.context_prompt_begin + context + cfg.context_prompt_end
if len(user.example) > 0:
example = user.example
else:
example = ""
if len(user.greeting) > 0:
greeting = "\n" + user.name2 + ": " + user.greeting
else:
greeting = ""
# Make prompt: context + example + conversation history
available_len = generation_params["truncation_length"]
context_len = generator.get_tokens_count(context)
available_len -= context_len
if available_len < 0:
available_len = 0
logging.info("telegram_bot - CONTEXT IS TOO LONG!!!")
conversation = [example, greeting]
for i in user.history:
if len(i["in"]) > 0:
conversation.append("".join([cfg.user_prompt_begin, i["in"], cfg.user_prompt_end]))
if len(i["out"]) > 0:
conversation.append("".join([cfg.bot_prompt_begin, i["out"], cfg.bot_prompt_end]))
if len(cfg.bot_prompt_end):
conversation[-1] = conversation[-1][: -len(cfg.bot_prompt_end)]
prompt = ""
for s in reversed(conversation):
s = "\n" + s if len(s) > 0 else s
s_len = generator.get_tokens_count(s)
if available_len >= s_len:
prompt = s + prompt
available_len -= s_len
else:
break
prompt = context + prompt
prompt = sub(
r": +",
": ",
prompt,
)
# Generate!
if debug_flag:
print(prompt)
answer = generator.generate_answer(
prompt=prompt,
generation_params=generation_params,
eos_token=eos_token,
stopping_strings=stopping_strings,
default_answer=answer,
turn_template=user.turn_template,
)
if debug_flag:
print(answer)
# Truncate prompt prefix/postfix
if len(cfg.bot_prompt_end) > 0 and answer.endswith(cfg.bot_prompt_end):
answer = answer[: -len(cfg.bot_prompt_end)]
if len(cfg.bot_prompt_end) > 2 and answer.endswith(cfg.bot_prompt_end[:-1]):
answer = answer[: -len(cfg.bot_prompt_end[:-1])]
if len(cfg.bot_prompt_begin) > 0 and answer.startswith(cfg.bot_prompt_begin):
answer = answer[: -len(cfg.bot_prompt_begin)]
# If generation result zero length - return "Empty answer."
if len(answer) < 1:
answer = const.GENERATOR_EMPTY_ANSWER
# Final return
if answer not in [const.GENERATOR_EMPTY_ANSWER, const.GENERATOR_FAIL]:
# if everything ok - add generated answer in history and return
# last
for end in stopping_strings:
if answer.endswith(end):
answer = answer[: -len(end)]
user.change_last_message(history_out=user.history_last_out + " " + answer)
generator_lock.release()
if len(user.msg_id) > 0:
if str(user.msg_id[-1]) in user.previous_history:
if user.previous_history[str(user.msg_id[-1])][-1] == user.history_last_out:
return_msg_action = const.MSG_NOTHING_TO_DO
return user.history_last_out, return_msg_action
except Exception as exception:
logging.error("get_answer (generator part) " + str(exception) + str(exception.args))
# anyway, release generator lock. Then return
generator_lock.release()
return_msg_action = const.MSG_SYSTEM
return user.history_last_out, return_msg_action