|
import requests |
|
from config import conf |
|
from common.log import logger |
|
import os |
|
|
|
|
|
class LinkSummary: |
|
def __init__(self): |
|
pass |
|
|
|
def summary_file(self, file_path: str): |
|
file_body = { |
|
"file": open(file_path, "rb"), |
|
"name": file_path.split("/")[-1], |
|
} |
|
url = self.base_url() + "/v1/summary/file" |
|
res = requests.post(url, headers=self.headers(), files=file_body, timeout=(5, 300)) |
|
return self._parse_summary_res(res) |
|
|
|
def summary_url(self, url: str): |
|
body = { |
|
"url": url |
|
} |
|
res = requests.post(url=self.base_url() + "/v1/summary/url", headers=self.headers(), json=body, timeout=(5, 180)) |
|
return self._parse_summary_res(res) |
|
|
|
def summary_chat(self, summary_id: str): |
|
body = { |
|
"summary_id": summary_id |
|
} |
|
res = requests.post(url=self.base_url() + "/v1/summary/chat", headers=self.headers(), json=body, timeout=(5, 180)) |
|
if res.status_code == 200: |
|
res = res.json() |
|
logger.debug(f"[LinkSum] chat open, res={res}") |
|
if res.get("code") == 200: |
|
data = res.get("data") |
|
return { |
|
"questions": data.get("questions"), |
|
"file_id": data.get("file_id") |
|
} |
|
else: |
|
res_json = res.json() |
|
logger.error(f"[LinkSum] summary error, status_code={res.status_code}, msg={res_json.get('message')}") |
|
return None |
|
|
|
def _parse_summary_res(self, res): |
|
if res.status_code == 200: |
|
res = res.json() |
|
logger.debug(f"[LinkSum] url summary, res={res}") |
|
if res.get("code") == 200: |
|
data = res.get("data") |
|
return { |
|
"summary": data.get("summary"), |
|
"summary_id": data.get("summary_id") |
|
} |
|
else: |
|
res_json = res.json() |
|
logger.error(f"[LinkSum] summary error, status_code={res.status_code}, msg={res_json.get('message')}") |
|
return None |
|
|
|
def base_url(self): |
|
return conf().get("linkai_api_base", "https://api.link-ai.chat") |
|
|
|
def headers(self): |
|
return {"Authorization": "Bearer " + conf().get("linkai_api_key")} |
|
|
|
def check_file(self, file_path: str, sum_config: dict) -> bool: |
|
file_size = os.path.getsize(file_path) // 1000 |
|
|
|
if (sum_config.get("max_file_size") and file_size > sum_config.get("max_file_size")) or file_size > 15000: |
|
logger.warn(f"[LinkSum] file size exceeds limit, No processing, file_size={file_size}KB") |
|
return False |
|
|
|
suffix = file_path.split(".")[-1] |
|
support_list = ["txt", "csv", "docx", "pdf", "md", "jpg", "jpeg", "png"] |
|
if suffix not in support_list: |
|
logger.warn(f"[LinkSum] unsupported file, suffix={suffix}, support_list={support_list}") |
|
return False |
|
|
|
return True |
|
|
|
def check_url(self, url: str): |
|
if not url: |
|
return False |
|
support_list = ["http://mp.weixin.qq.com", "https://mp.weixin.qq.com"] |
|
black_support_list = ["https://mp.weixin.qq.com/mp/waerrpage"] |
|
for black_url_prefix in black_support_list: |
|
if url.strip().startswith(black_url_prefix): |
|
logger.warn(f"[LinkSum] unsupported url, no need to process, url={url}") |
|
return False |
|
for support_url in support_list: |
|
if url.strip().startswith(support_url): |
|
return True |
|
return False |
|
|