import datetime import re import socket import sys import traceback from typing import Literal, Optional import jieba import json5 from jieba import analyse from qwen_agent.log import logger def get_local_ip(): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: # doesn't even have to be reachable s.connect(('10.255.255.255', 1)) IP = s.getsockname()[0] except Exception: IP = '127.0.0.1' finally: s.close() return IP def print_traceback(): logger.error(''.join(traceback.format_exception(*sys.exc_info()))) def has_chinese_chars(data) -> bool: text = f'{data}' return len(re.findall(r'[\u4e00-\u9fff]+', text)) > 0 def get_current_date_str( lang: Literal['en', 'zh'] = 'en', hours_from_utc: Optional[int] = None, ) -> str: if hours_from_utc is None: cur_time = datetime.datetime.now() else: cur_time = datetime.datetime.utcnow() + datetime.timedelta( hours=hours_from_utc) if lang == 'en': date_str = 'Current date: ' + cur_time.strftime('%A, %B %d, %Y') elif lang == 'zh': cur_time = cur_time.timetuple() date_str = f'当前时间:{cur_time.tm_year}年{cur_time.tm_mon}月{cur_time.tm_mday}日,星期' date_str += ['一', '二', '三', '四', '五', '六', '日'][cur_time.tm_wday] date_str += '。' else: raise NotImplementedError return date_str def save_text_to_file(path, text): try: with open(path, 'w', encoding='utf-8') as fp: fp.write(text) return 'SUCCESS' except Exception as ex: print_traceback() return ex ignore_words = [ '', ' ', '\t', '\n', '\\', 'is', 'are', 'am', 'what', 'how', '的', '吗', '是', '了', '啊', '呢', '怎么', '如何', '什么', '?', '?', '!', '!', '“', '”', '‘', '’', "'", "'", '"', '"', ':', ':', '讲了', '描述', '讲', '说说', '讲讲', '介绍', '总结下', '总结一下', '文档', '文章', '文稿', '稿子', '论文', 'PDF', 'pdf', '这个', '这篇', '这', '我', '帮我', '那个', '下', '翻译' ] def get_split_word(text): text = text.lower() _wordlist = jieba.lcut(text.strip()) wordlist = [] for x in _wordlist: if x in ignore_words: continue wordlist.append(x) return wordlist def get_keyword_by_llm(text, keyword_agent): try: res = keyword_agent.run(text) res = json5.loads(res) except Exception: print('keyword is not json format!') return get_split_word(text) # json format _wordlist = [] try: if 'keywords_zh' in res and isinstance(res['keywords_zh'], list): _wordlist.extend([kw.lower() for kw in res['keywords_zh']]) if 'keywords_en' in res and isinstance(res['keywords_en'], list): _wordlist.extend([kw.lower() for kw in res['keywords_en']]) wordlist = [] for x in _wordlist: if x in ignore_words: continue wordlist.append(x) wordlist.extend(get_split_word(text)) return wordlist except Exception: return get_split_word(text) def get_key_word(text): text = text.lower() _wordlist = analyse.extract_tags(text) wordlist = [] for x in _wordlist: if x in ignore_words: continue wordlist.append(x) print('wordlist: ', wordlist) return wordlist def get_last_one_line_context(text): lines = text.split('\n') n = len(lines) res = '' for i in range(n - 1, -1, -1): if lines[i].strip(): res = lines[i] break return res def extract_urls(text): pattern = re.compile(r'https?://\S+') urls = re.findall(pattern, text) return urls def extract_obs(text): k = text.rfind('\nObservation:') j = text.rfind('\nThought:') obs = text[k + len('\nObservation:'):j] return obs.strip() def extract_code(text): # Match triple backtick blocks first triple_match = re.search(r'```[^\n]*\n(.+?)```', text, re.DOTALL) if triple_match: text = triple_match.group(1) else: try: text = json5.loads(text)['code'] except Exception: print_traceback() # If no code blocks found, return original text return text def parse_latest_plugin_call(text): plugin_name, plugin_args = '', '' i = text.rfind('\nAction:') j = text.rfind('\nAction Input:') k = text.rfind('\nObservation:') if 0 <= i < j: # If the text has `Action` and `Action input`, if k < j: # but does not contain `Observation`, # then it is likely that `Observation` is ommited by the LLM, # because the output text may have discarded the stop word. text = text.rstrip() + '\nObservation:' # Add it back. k = text.rfind('\nObservation:') plugin_name = text[i + len('\nAction:'):j].strip() plugin_args = text[j + len('\nAction Input:'):k].strip() text = text[:k] return plugin_name, plugin_args, text # TODO: Say no to these ugly if statements. def format_answer(text): action, action_input, output = parse_latest_plugin_call(text) if 'code_interpreter' in text: rsp = '' code = extract_code(action_input) rsp += ('\n```py\n' + code + '\n```\n') obs = extract_obs(text) if '![fig' in obs: rsp += obs return rsp elif 'image_gen' in text: # get url of FA # img_urls = URLExtract().find_urls(text.split("Final Answer:")[-1].strip()) obs = text.split('Observation:')[-1].split('\nThought:')[0].strip() img_urls = [] if obs: logger.info(repr(obs)) try: obs = json5.loads(obs) img_urls.append(obs['image_url']) except Exception: print_traceback() img_urls = [] if not img_urls: img_urls = extract_urls(text.split('Final Answer:')[-1].strip()) logger.info(img_urls) rsp = '' for x in img_urls: rsp += '\n![picture](' + x.strip() + ')' return rsp else: return text.split('Final Answer:')[-1].strip()