Yijun-Yang's picture
updategradiofrontend
92bcd1d
raw
history blame
No virus
23.5 kB
# Copyright (c) OpenMMLab. All rights reserved.
"""Pipeline."""
import argparse
import datetime
import re
import time
import os
import json
import pytoml
from loguru import logger
from .helper import ErrorCode, QueryTracker
from .llm_client import ChatClient
from .retriever import CacheRetriever
from .sg_search import SourceGraphProxy
from .web_search import WebSearch
def convertid2url(text):
# Regular expression to find all PMC references
pattern = r"\[PMC(\d+)\]"
# Function to replace each match with a URL link
replacement = lambda match: f"[PMC{match.group(1)}](https://www.ncbi.nlm.nih.gov/pmc/articles/PMC{match.group(1)}/)"
# Replace all occurrences in the text
formatted_text = re.sub(pattern, replacement, text)
return formatted_text
class Worker:
"""The Worker class orchestrates the logic of handling user queries,
generating responses and managing several aspects of a chat assistant. It
enables feature storage, language model client setup, time scheduling and
much more.
Attributes:
llm: A ChatClient instance that communicates with the language model.
fs: An instance of FeatureStore for loading and querying features.
config_path: A string indicating the path of the configuration file.
config: A dictionary holding the configuration settings.
language: A string indicating the language of the chat, default is 'zh' (Chinese). # noqa E501
context_max_length: An integer representing the maximum length of the context used by the language model. # noqa E501
Several template strings for various prompts are also defined.
"""
def __init__(self, work_dir: str, config_path: str, language: str = 'zh'):
"""Constructs all the necessary attributes for the worker object.
Args:
work_dir (str): The working directory where feature files are located.
config_path (str): The location of the configuration file.
language (str, optional): Specifies the language to be used. Defaults to 'zh' (Chinese). # noqa E501
"""
self.llm = ChatClient(config_path=config_path) #每次实例化worker都会重新实例化chatclient并读取本地最新的配置文件
self.retriever = CacheRetriever(config_path=config_path).get(work_dir=work_dir)
self.config_path = config_path
self.config = None
self.language = language
with open(config_path, encoding='utf8') as f:
self.config = pytoml.load(f)
if self.config is None:
raise Exception('worker config can not be None')
llm_config = self.config['llm']
self.context_max_length = llm_config['server']['local_llm_max_text_length']
self.logsavepath = os.path.join(self.config['feature_store']['work_dir'],self.config['worker']['save_path'])
if not os.path.exists(self.logsavepath.replace(self.logsavepath.split('/')[-1],'')):
os.makedirs(self.logsavepath.replace(self.logsavepath.split('/')[-1],''))
if llm_config['enable_remote']:
self.context_max_length = llm_config['server'][
'remote_llm_max_text_length']
# Switch languages according to the scenario.
if self.language == 'zh':
self.TOPIC_TEMPLATE = '告诉我这句话的主题,直接说主题不要解释:“{}”'
self.SCORING_QUESTION_TEMPLTE = '“{}”\n请仔细阅读以上内容,判断句子是否是个有主题的疑问句,结果用 0~10 表示。直接提供得分不要解释。\n判断标准:有主语谓语宾语并且是疑问句得 10 分;缺少主谓宾扣分;陈述句直接得 0 分;不是疑问句直接得 0 分。直接提供得分不要解释。' # noqa E501
self.SCORING_RELAVANCE_TEMPLATE = '问题:“{}”\n材料:“{}”\n请仔细阅读以上内容,判断问题和材料的关联度,用0~10表示。判断标准:非常相关得 10 分;完全没关联得 0 分。直接提供得分不要解释。\n' # noqa E501
self.KEYWORDS_TEMPLATE = '谷歌搜索是一个通用搜索引擎,可用于访问互联网、查询百科知识、了解时事新闻等。搜索参数类型 string, 内容是短语或关键字,以空格分隔。\n你现在是{}交流群里的技术助手,用户问“{}”,你打算通过谷歌搜索查询相关资料,请提供用于搜索的关键字或短语,不要解释直接给出关键字或短语。' # noqa E501
self.SECURITY_TEMAPLTE = '判断以下句子是否涉及政治、辱骂、色情、恐暴、宗教、网络暴力、种族歧视等违禁内容,结果用 0~10 表示,不要解释直接给出得分。判断标准:涉其中任一问题直接得 10 分;完全不涉及得 0 分。直接给得分不要解释:“{}”' # noqa E501
self.PERPLESITY_TEMPLATE = '“question:{} answer:{}”\n阅读以上对话,answer 是否在表达自己不知道,回答越全面得分越少,用0~10表示,不要解释直接给出得分。\n判断标准:准确回答问题得 0 分;答案详尽得 1 分;知道部分答案但有不确定信息得 8 分;知道小部分答案但推荐求助其他人得 9 分;不知道任何答案直接推荐求助别人得 10 分。直接打分不要解释。' # noqa E501
self.SUMMARIZE_TEMPLATE = '{} \n 仔细阅读以上内容,总结得简短有力点' # noqa E501
self.GENERATE_TEMPLATE = '材料:“{}”\n 问题:“{}” \n 请仔细阅读参考材料回答问题,材料可能和问题无关。如果材料和问题无关,尝试用你自己的理解来回答问题。如果无法确定答案,直接回答不知道。' # noqa E501
self.GENERATE_TEMPLATE = '材料:“{}”\n 问题:“{}” \n 请仔细阅读参考材料回答问题,回答中附上对应内容的参考id,例如:治疗方法的主要决定因素是年龄、合并症和诊断分子特征[PMC9958586]' # yyj
self.ANNOTATE_CLUSTER = '这是关于{}的不同论文的分块句子,它们通过相似性进行了聚类,以下是其中一个聚类的10个样本:“{}”\n 请用一句话标注这个聚类。' # noqa E501
self.INSPIRATION_TEMPLATE = '以下是一些有关{0}的文章内容的总结 {1},请提出一个关于{0}的综述子问题,一个问题即可。'
else:
self.TOPIC_TEMPLATE = 'Tell me the theme of this sentence, just state the theme without explanation: "{}"' # noqa E501
self.SCORING_QUESTION_TEMPLTE = '"{}"\nPlease read the content above carefully and judge whether the sentence is a thematic question. Rate it on a scale of 0-10. Only provide the score, no explanation.\nThe criteria are as follows: a sentence gets 10 points if it has a subject, predicate, object and is a question; points are deducted for missing subject, predicate or object; declarative sentences get 0 points; sentences that are not questions also get 0 points. Just give the score, no explanation.' # noqa E501
self.SCORING_RELAVANCE_TEMPLATE = 'Question: "{}", Background Information: "{}"\nPlease read the content above carefully and assess the relevance between the question and the material on a scale of 0-10. The scoring standard is as follows: extremely relevant gets 10 points; completely irrelevant gets 0 points. Only provide the score, no explanation needed.' # noqa E501
self.KEYWORDS_TEMPLATE = 'Google search is a general-purpose search engine that can be used to access the internet, look up encyclopedic knowledge, keep abreast of current affairs and more. Search parameters type: string, content consists of phrases or keywords separated by spaces.\nYou are now the assistant in the "{}" communication group. A user asked "{}", you plan to use Google search to find related information, please provide the keywords or phrases for the search, no explanation, just give the keywords or phrases.' # noqa E501
self.SECURITY_TEMAPLTE = 'Evaluate whether the following sentence involves prohibited content such as politics, insult, pornography, terror, religion, cyber violence, racial discrimination, etc., rate it on a scale of 0-10, do not explain, just give the score. The scoring standard is as follows: any violation directly gets 10 points; completely unrelated gets 0 points. Give the score, no explanation: "{}"' # noqa E501
self.PERPLESITY_TEMPLATE = 'Question: {} Answer: {}\nRead the dialogue above, does the answer express that they don\'t know? The more comprehensive the answer, the lower the score. Rate it on a scale of 0-10, no explanation, just give the score.\nThe scoring standard is as follows: an accurate answer to the question gets 0 points; a detailed answer gets 1 point; knowing some answers but having uncertain information gets 8 points; knowing a small part of the answer but recommends seeking help from others gets 9 points; not knowing any of the answers and directly recommending asking others for help gets 10 points. Just give the score, no explanation.' # noqa E501
self.SUMMARIZE_TEMPLATE = '"{}" \n Read the content above carefully, summarize it in a short and powerful way.' # noqa E501
self.GENERATE_TEMPLATE = 'Background Information: "{}"\n Question: "{}"\n Please read the reference material carefully and answer the question with reference id at the end of the corresponding content for example: Primary determinants of the therapeutic approach are age, comorbidities, and diagnostic molecular profile [PMC9958586]' # yyj
self.ANNOTATE_CLUSTER = 'these are chunklized sentences from different papers about{}, they are clustered by similarity, the following is 10 samples from one of the cluster: "{}"\n Please tag the cluster in one breif sentence.'
self.INSPIRATION_TEMPLATE = 'Given the following summary of the articles content about {0} {1}, give some idea or sub-questions of the review about {0}, one question is sufficient.' # noqa E501
def single_judge(self, prompt, tracker, throttle: int, default: int,backend):
"""Generates a score based on the prompt, and then compares it to
threshold.
Args:
prompt (str): The prompt for the language model.
tracker (obj): An instance of QueryTracker logs the operations.
throttle (int): Threshold value to compare the score against.
default (int): Default score to be assigned in case of failure in score calculation. # noqa E501
Returns:
bool: True if the score surpasses the throttle, otherwise False.
"""
if prompt is None or len(prompt) == 0:
return False
score = default
relation = self.llm.generate_response(prompt=prompt, backend=backend)
tracker.log('score', [relation, throttle, default])
filtered_relation = ''.join([c for c in relation if c.isdigit()])
try:
score_str = re.sub(r'[^\d]', ' ', filtered_relation).strip()
score = int(score_str.split(' ')[0])
except Exception as e:
logger.error(str(e))
if score >= throttle:
return True
return False
def work_time(self):
"""Determines if the current time falls within the scheduled working
hours of the chat assistant.
Returns:
bool: True if the current time is within working hours, otherwise False. # noqa E501
"""
time_config = self.config['worker']['time']
beginWork = datetime.datetime.now().strftime(
'%Y-%m-%d') + ' ' + time_config['start']
endWork = datetime.datetime.now().strftime(
'%Y-%m-%d') + ' ' + time_config['end']
beginWorkSeconds = time.time() - time.mktime(
time.strptime(beginWork, '%Y-%m-%d %H:%M:%S'))
endWorkSeconds = time.time() - time.mktime(
time.strptime(endWork, '%Y-%m-%d %H:%M:%S'))
if int(beginWorkSeconds) > 0 and int(endWorkSeconds) < 0:
if not time_config['has_weekday']:
return True
if int(datetime.datetime.now().weekday()) in range(7):
return True
return False
def generate(self, query, history, groupname,backend):
"""Processes user queries and generates appropriate responses. It
involves several steps including checking for valid questions,
extracting topics, querying the feature store, searching the web, and
generating responses from the language model.
Args:
query (str): User's query.
history (str): Chat history.
groupname (str): The group name in which user asked the query.
backend (str): The backend to use for generating the response.
Returns:
ErrorCode: An error code indicating the status of response generation. # noqa E501
str: Generated response to the user query.
references: List for referenced filename or web url
"""
response = ''
references = []
if not self.work_time():
return ErrorCode.NOT_WORK_TIME, response, references
if query is None or len(query) < 6:
return ErrorCode.NOT_A_QUESTION, response, references
reborn_code = ErrorCode.SUCCESS
tracker = QueryTracker(self.logsavepath)
tracker.log('input', [query, history, groupname])
if not self.single_judge(
prompt=self.SCORING_QUESTION_TEMPLTE.format(query),
tracker=tracker,
throttle=6,
default=3,backend=backend):
return ErrorCode.NOT_A_QUESTION, response, references
topic = self.llm.generate_response(self.TOPIC_TEMPLATE.format(query),backend=backend)
tracker.log('topic', topic)
if len(topic) <= 2:
return ErrorCode.NO_TOPIC, response, references
# chunk, db_context, references = self.retriever.query(
# topic,
# context_max_length=self.context_max_length -
# 2 * len(self.GENERATE_TEMPLATE))
chunks, references = self.retriever.query(
topic,
context_max_length=self.context_max_length - 2 * len(self.GENERATE_TEMPLATE),
tracker=tracker)
# if db_context is None:
# tracker.log('feature store reject')
# return ErrorCode.UNRELATED, response, references
context = ''
refs = []
for chunk,ref in zip(chunks,references):
if self.single_judge(self.SCORING_RELAVANCE_TEMPLATE.format(query, chunk),
tracker=tracker,
throttle=5,
default=10,backend=backend):
context += f"reference: {ref} content: {chunk}"
context += '\n\n'
refs.append(ref)
refs = list(set(refs))
if len(context) > 0:
prompt, history = self.llm.build_prompt(
instruction=query,
context=context,
history_pair=history,
template=self.GENERATE_TEMPLATE)
response = self.llm.generate_response(prompt=prompt,
backend=backend,
history=history)
tracker.log('feature store doc', [chunk, response])
response = convertid2url(response)
return ErrorCode.SUCCESS, response, refs
else:
return ErrorCode.NO_SEARCH_RESULT, response, references
# try:
# references = []
# web_context = ''
# web_search = WebSearch(config_path=self.config_path)
# articles, error = web_search.get(query=topic, max_article=2)
# if error is not None:
# return ErrorCode.SEARCH_FAIL, response, references
# tracker.log('search returned')
# web_context_max_length = self.context_max_length - 2 * len(
# self.SCORING_RELAVANCE_TEMPLATE)
# for article in articles:
# if len(article) > 0:
# article.cut(0, web_context_max_length)
# if self.single_judge(
# self.SCORING_RELAVANCE_TEMPLATE.format(
# query, article.content),
# tracker=tracker,
# throttle=5,
# default=10):
# web_context += '\n\n'
# web_context += article.content
# references.append(article.source)
# web_context = web_context[0:web_context_max_length]
# web_context = web_context.strip()
# if len(web_context) > 0:
# prompt, history = self.llm.build_prompt(
# instruction=query,
# context=web_context,
# history_pair=history,
# template=self.GENERATE_TEMPLATE)
# response = self.llm.generate_response(prompt=prompt,
# history=history)
# else:
# reborn_code = ErrorCode.NO_SEARCH_RESULT
# tracker.log('web response', [web_context, response, reborn_code])
# except Exception as e:
# logger.error(e)
# if response is not None and len(response) > 0:
# prompt = self.PERPLESITY_TEMPLATE.format(query, response)
# if self.single_judge(prompt=prompt,
# tracker=tracker,
# throttle=10,
# default=0,backend = backend):
# reborn_code = ErrorCode.BAD_ANSWER
# if self.config['worker']['enable_sg_search']:
# if reborn_code == ErrorCode.BAD_ANSWER or reborn_code == ErrorCode.NO_SEARCH_RESULT: # noqa E501
# # reborn
# sg = SourceGraphProxy(config_path=self.config_path,
# language=self.language)
# sg_context = sg.search(llm_client=self.llm,
# question=query,
# groupname=groupname)
# if sg_context is not None and len(sg_context) > 2:
# prompt, history = self.llm.build_prompt(
# instruction=query,
# context=sg_context,
# history_pair=history,
# template=self.GENERATE_TEMPLATE)
# response = self.llm.generate_response(prompt=prompt,
# history=history,
# backend='remote')
# tracker.log('source graph', [sg_context, response])
# prompt = self.PERPLESITY_TEMPLATE.format(query, response)
# if self.single_judge(prompt=prompt,
# tracker=tracker,
# throttle=9,
# default=0):
# return ErrorCode.BAD_ANSWER, response, references
# if response is not None and len(response) >= 800:
# # reply too long, summarize it
# response = self.llm.generate_response(
# prompt=self.SUMMARIZE_TEMPLATE.format(response),
# backend=backend)
# if len(response) > 0 and self.single_judge(
# self.SECURITY_TEMAPLTE.format(response),
# tracker=tracker,
# throttle=3,
# default=0):
# return ErrorCode.SECURITY, response, references
# if reborn_code != ErrorCode.SUCCESS:
# return reborn_code, response, references
# return ErrorCode.SUCCESS, response, references
def annotate_cluster(self,theme, cluster_no, chunk, history, groupname,backend):
"""Annotates a cluster of questions based on the user query and
generates appropriate responses.
Args:
cluster_no (str): The cluster number of the questions.
chunks (str): A cluster of questions.
history (str): Chat history.
groupname (str): The group name in which user asked the query.
backend (str): The backend to use for generating the response.
Returns:
ErrorCode: An error code indicating the status of response generation. # noqa E501
str: Generated response to the user query.
references: List for referenced filename or web url
"""
response = ''
references = []
if not self.work_time():
return ErrorCode.NOT_WORK_TIME, response, references
tracker = QueryTracker(self.logsavepath)
tracker.log('input', [chunk[:1000], history, groupname])
response = self.llm.generate_response(prompt=self.ANNOTATE_CLUSTER.format(theme,chunk),
backend=backend,
history=history)
if response is not None and len(response) >= 800:
# reply too long, summarize it
response = self.llm.generate_response(
prompt=self.SUMMARIZE_TEMPLATE.format(response),
backend=backend,
history=history)
tracker.log('annotate cluster', [cluster_no,chunk, response])
return ErrorCode.SUCCESS, response, cluster_no
def getinspiration(self,theme,annotations,history,groupname,backend):
"""
give some idea of the review given the summary of the articals content
"""
response = ''
references = []
if not self.work_time():
return ErrorCode.NOT_WORK_TIME, response, references
tracker = QueryTracker(self.logsavepath)
tracker.log('input', [annotations[:1000], history, groupname])
response = self.llm.generate_response(prompt=self.INSPIRATION_TEMPLATE.format(theme,annotations),
backend=backend,
history=history)
tracker.log('get inspiration', [theme,annotations, response])
return ErrorCode.SUCCESS, response
def parse_args():
"""Parses command-line arguments."""
parser = argparse.ArgumentParser(description='Worker.')
parser.add_argument('work_dir', type=str, help='Working directory.')
parser.add_argument(
'--config_path',
default='config.ini',
help='Worker configuration path. Default value is config.ini')
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
bot = Worker(work_dir=args.work_dir, config_path=args.config_path)
queries = ['茴香豆是怎么做的']
for example in queries:
print(bot.generate(query=example, history=[], groupname=''))