import os import re import stat import sys from urllib.parse import unquote, urlparse import jsonlines from agent.log import logger from agent.utils.doc_parser import parse_doc, parse_html_bs from agent.utils.utils import print_traceback, save_text_to_file from schema import Record from b2sdk.v2 import B2Api from b2sdk.v2 import InMemoryAccountInfo import hashlib import datetime from io import BytesIO def _fix_secure_write_for_code_interpreter(code_interpreter_ws): if 'linux' in sys.platform.lower(): fname = os.path.join(code_interpreter_ws, 'test_file_permission.txt') if os.path.exists(fname): os.remove(fname) with os.fdopen( os.open(fname, os.O_CREAT | os.O_WRONLY | os.O_TRUNC, 0o0600), 'w') as f: f.write('test') file_mode = stat.S_IMODE(os.stat(fname).st_mode) & 0o6677 if file_mode != 0o0600: os.environ['JUPYTER_ALLOW_INSECURE_WRITES'] = '1' work_space_root = "./workspace" cache_root = f"{work_space_root}/browser_cache/" download_root = f"{work_space_root}/download/" code_interpreter_ws = f"{work_space_root}/ci_workspace/" cache_file_popup_url = os.path.join(cache_root, 'popup_url.jsonl') cache_file = os.path.join(cache_root, 'browse.jsonl') max_ref_token = 4000 max_days = 7 os.makedirs(work_space_root, exist_ok=True) os.makedirs(cache_root, exist_ok=True) os.makedirs(download_root, exist_ok=True) os.makedirs(code_interpreter_ws, exist_ok=True) code_interpreter_work_dir = code_interpreter_ws os.environ['M6_CODE_INTERPRETER_WORK_DIR'] = code_interpreter_work_dir os.environ['M6_CODE_INTERPRETER_STATIC_URL'] = f'{os.getenv("DOMAIN")}/static' os.environ["HF_HOME"] = ".cache/huggingface/" os.environ["MPLCONFIGDIR"] = ".cache/huggingface/" _fix_secure_write_for_code_interpreter(code_interpreter_ws) class B2Manager(): def __init__(self): info = InMemoryAccountInfo() b2_api = B2Api(info) application_key_id = os.environ.get("b2_key_id") application_key = os.environ.get("b2_key") b2_api.authorize_account("production", application_key_id, application_key) self.b2_bucket = b2_api.get_bucket_by_name(os.environ.get("b2_bucket_name")) self.b2_api = b2_api self.file_name = None def gen_file_name(self, access_token, url, need_md5): url_md5 = hashlib.md5(b'%s' % url.encode(encoding='UTF-8')).hexdigest() return f"{access_token}/{url_md5}" if need_md5 else f"{access_token}/{url}" def get(self, access_token, url, need_md5=True): in_memory_file = BytesIO() self.b2_bucket.download_file_by_name(self.gen_file_name(access_token, url, need_md5)).save(in_memory_file) # export_file = self.b2_bucket.download_file_by_name(self.file_name) # export_file.save(in_memory_file) in_memory_file.seek(0) return str(in_memory_file.read(), "utf-8") def upsert(self, access_token, url, content, need_md5=True): self.b2_bucket.upload_bytes(content.encode('utf-8'), self.gen_file_name(access_token, url, need_md5), file_infos=None) # self.b2_bucket.upload() def delete(self, access_token, url, need_md5=True): file_version_info = self.b2_bucket.get_file_info_by_name(self.gen_file_name(access_token, url, need_md5)) self.b2_bucket.hide_file(file_version_info.file_name) # for version in self.b2_bucket.list_file_versions(self.file_name): # self.b2_bucket.delete_file_version(version.id_, version.file_name) def list_files(self, access_token): files = [] for file_version_info, folder_name in self.b2_bucket.ls(folder_to_list=f"{access_token}/"): # The upload timestamp is in milliseconds, so we divide by 1000 to convert it to seconds upload_timestamp = datetime.datetime.fromtimestamp(file_version_info.upload_timestamp / 1000.0) files.append(f"File Name: {file_version_info.file_name}, \nUpload timestamp: {upload_timestamp}, \nMetadata: {file_version_info.file_info}") return files def exists(self, access_token, url=None, need_md5=True): try: self.b2_bucket.get_file_info_by_name(self.gen_file_name(access_token, url, need_md5)) return True except: return False def update_pop_url(data, cache_file_popup_url, access_token): new_line = {'url': data['url'], "access_token": access_token} lines = [] for line in jsonlines.open(cache_file_popup_url): if line['access_token'] == access_token and line['url'] != data['url']: lines.append(line) lines.append(new_line) with jsonlines.open(cache_file_popup_url, mode='w') as writer: for new_line in lines: writer.write(new_line) response = 'Update URL' return response def change_checkbox_state(text, cache_file, access_token): if not os.path.exists(cache_file): return {'result': 'no file'} lines = [] for line in jsonlines.open(cache_file): if line['access_token'] == access_token and line['url'] == text[3:]: if line['checked']: line['checked'] = False else: line['checked'] = True lines.append(line) with jsonlines.open(cache_file, mode='w') as writer: for new_line in lines: writer.write(new_line) return {'result': 'changed'} def is_local_path(path): if path.startswith('https://') or path.startswith('http://'): return False return True def sanitize_chrome_file_path(file_path: str) -> str: # For Linux and macOS. if os.path.exists(file_path): return file_path # For native Windows, drop the leading '/' in '/C:/' win_path = file_path if win_path.startswith('/'): win_path = win_path[1:] if os.path.exists(win_path): return win_path # For Windows + WSL. if re.match(r'^[A-Za-z]:/', win_path): wsl_path = f'/mnt/{win_path[0].lower()}/{win_path[3:]}' if os.path.exists(wsl_path): return wsl_path # For native Windows, replace / with \. win_path = win_path.replace('/', '\\') if os.path.exists(win_path): return win_path return file_path def extract_and_cache_document(data, cache_root, access_token): logger.info('Starting cache pages...') if data['url'].split('.')[-1].lower() in ['pdf', 'docx', 'pptx']: date1 = datetime.datetime.now() # generate one processing record new_record = Record(url=data['url'], time='', type=data['type'], raw=[], extract='', access_token=access_token, topic='', checked=False, session=[]) service.upsert(access_token, data['url'], new_record.model_dump_json()) if data['url'].startswith('https://') or data['url'].startswith( 'http://'): pdf_path = data['url'] else: parsed_url = urlparse(data['url']) pdf_path = unquote(parsed_url.path) pdf_path = sanitize_chrome_file_path(pdf_path) try: pdf_content = parse_doc(pdf_path) except Exception: print_traceback() # del the processing record service.delete(access_token, data['url']) return 'failed' date2 = datetime.datetime.now() logger.info('Parsing pdf time: ' + str(date2 - date1)) data['content'] = pdf_content data['type'] = 'pdf' extract = pdf_path.split('/')[-1].split('\\')[-1].split('.')[0] elif data['content'] and data['type'] == 'html': new_record = Record(url=data['url'], time='', type=data['type'], raw=[], extract='', access_token=access_token, topic='', checked=False, session=[]) service.upsert(access_token, data['url'], new_record.model_dump_json()) try: tmp_html_file = os.path.join(cache_root, 'tmp.html') save_text_to_file(tmp_html_file, data['content']) data['content'] = parse_html_bs(tmp_html_file) except Exception: print_traceback() extract = data['content'][0]['metadata']['title'] else: logger.error( 'Only Support the Following File Types: [\'.html\', \'.pdf\', \'.docx\', \'.pptx\']' ) raise NotImplementedError today = datetime.date.today() new_record = Record(url=data['url'], time=str(today), type=data['type'], raw=data['content'], extract=extract, access_token=access_token, topic='', checked=True, session=[]) service.upsert(access_token, data['url'], new_record.model_dump_json()) response = 'Cached' return response service = B2Manager() if __name__ == '__main__': # print(service.gen_file_name("test", "settings.xml")) # print(service.get("test", "settings.xml")) # print(service.upsert("test", "settings.xml", b"1111")) print(service.list_files("test")) print(service.exists("test", "https://tree-iad1-0003.secure.backblaze.com/b2_browse_files2.htm1"))