Spaces:
Running
Running
| # ============================================ | |
| # text_utils.py | |
| # ํ์ผ ์ถ์ถ, ์น ๊ฒ์, ๊ธฐ๋ณธ ํ ์คํธ ์ฒ๋ฆฌ ํจ์๋ค | |
| # ============================================ | |
| import re, os, json, time, zipfile, tempfile, zlib | |
| from pathlib import Path | |
| from collections import Counter | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from xml.etree import ElementTree as ET | |
| try: | |
| import httpx | |
| HAS_HTTPX = True | |
| except ImportError: | |
| HAS_HTTPX = False | |
| try: | |
| import pdfplumber | |
| HAS_PDFPLUMBER = True | |
| except ImportError: | |
| HAS_PDFPLUMBER = False | |
| try: | |
| import PyPDF2 | |
| HAS_PYPDF2 = True | |
| except ImportError: | |
| HAS_PYPDF2 = False | |
| try: | |
| from docx import Document as DocxDocument | |
| HAS_DOCX = True | |
| except ImportError: | |
| HAS_DOCX = False | |
| try: | |
| import olefile | |
| HAS_OLEFILE = True | |
| except ImportError: | |
| HAS_OLEFILE = False | |
| # ============================================ | |
| # ํ์ผ ์ถ์ถ ํจ์๋ค | |
| # ============================================ | |
| def extract_text_from_pdf(file_path): | |
| """PDF โ ํ ์คํธ""" | |
| pages = [] | |
| if HAS_PDFPLUMBER: | |
| try: | |
| with pdfplumber.open(file_path) as pdf: | |
| for p in pdf.pages: | |
| t = p.extract_text() | |
| if t: pages.append(t) | |
| if pages: return pages, None | |
| except Exception as e: | |
| print(f"pdfplumber: {e}") | |
| if HAS_PYPDF2: | |
| try: | |
| with open(file_path, 'rb') as f: | |
| reader = PyPDF2.PdfReader(f) | |
| for p in reader.pages: | |
| t = p.extract_text() | |
| if t: pages.append(t) | |
| if pages: return pages, None | |
| except Exception as e: | |
| print(f"PyPDF2: {e}") | |
| return None, "PDF ์ถ์ถ ์คํจ" | |
| def extract_text_from_docx(file_path): | |
| """DOCX โ ํ ์คํธ""" | |
| if not HAS_DOCX: return None, "python-docx ์์" | |
| try: | |
| doc = DocxDocument(file_path) | |
| sections = [] | |
| current = [] | |
| for para in doc.paragraphs: | |
| txt = para.text.strip() | |
| if not txt: | |
| if current: | |
| sections.append('\n'.join(current)) | |
| current = [] | |
| else: | |
| current.append(txt) | |
| if current: sections.append('\n'.join(current)) | |
| if sections: return sections, None | |
| return None, "DOCX ํ ์คํธ ์์" | |
| except Exception as e: | |
| return None, f"DOCX ์ค๋ฅ: {e}" | |
| def extract_text_from_txt(file_path): | |
| """TXT/MD/CSV ๋ฑ""" | |
| for enc in ['utf-8', 'euc-kr', 'cp949', 'utf-16', 'latin-1']: | |
| try: | |
| with open(file_path, 'r', encoding=enc) as f: | |
| text = f.read() | |
| if text.strip(): | |
| sections = [s.strip() for s in re.split(r'\n{2,}', text) if s.strip()] | |
| return sections if sections else [text], None | |
| except: continue | |
| return None, "ํ ์คํธ ์ธ์ฝ๋ฉ ์คํจ" | |
| def extract_text_from_hwpx(file_path): | |
| """HWPX(ํ๊ธ 2007 ์ด์) โ ํ ์คํธ""" | |
| try: | |
| text_parts = [] | |
| with zipfile.ZipFile(file_path, 'r') as zf: | |
| file_list = zf.namelist() | |
| section_files = sorted([f for f in file_list if f.startswith('Contents/section') and f.endswith('.xml')]) | |
| if not section_files: | |
| section_files = sorted([f for f in file_list if 'section' in f.lower() and f.endswith('.xml')]) | |
| for sf_name in section_files: | |
| try: | |
| with zf.open(sf_name) as sf: | |
| content = sf.read().decode('utf-8', errors='ignore') | |
| content = re.sub(r'\sxmlns[^"]*"[^"]*"', '', content) | |
| content = re.sub(r'<[a-zA-Z]+:', '<', content) | |
| content = re.sub(r'</[a-zA-Z]+:', '</', content) | |
| try: | |
| root = ET.fromstring(content) | |
| texts = [] | |
| for elem in root.iter(): | |
| if elem.tag.endswith('t') or elem.tag == 't': | |
| if elem.text: texts.append(elem.text) | |
| elif elem.text and elem.text.strip(): | |
| if any(x in elem.tag.lower() for x in ['text', 'run', 'para', 'char']): | |
| texts.append(elem.text.strip()) | |
| if texts: text_parts.append(' '.join(texts)) | |
| except ET.ParseError: | |
| matches = re.findall(r'>([^<]+)<', content) | |
| clean = [t.strip() for t in matches if t.strip() and len(t.strip()) > 1] | |
| if clean: text_parts.append(' '.join(clean)) | |
| except: continue | |
| if text_parts: | |
| return text_parts, None | |
| return None, "HWPX ํ ์คํธ ์์" | |
| except zipfile.BadZipFile: | |
| return None, "์ ํจํ์ง ์์ HWPX" | |
| except Exception as e: | |
| return None, f"HWPX ์ค๋ฅ: {e}" | |
| def _decode_hwp_para(data): | |
| """HWP ๋ฌธ๋จ ๋์ฝ๋ฉ""" | |
| result = [] | |
| i = 0 | |
| while i < len(data) - 1: | |
| code = int.from_bytes(data[i:i+2], 'little') | |
| if code in (1,2,3): i += 14 | |
| elif code == 9: result.append('\t') | |
| elif code in (10,13): result.append('\n') | |
| elif code == 24: result.append('-') | |
| elif code in (30,31): result.append(' ') | |
| elif code >= 32: | |
| try: | |
| ch = chr(code) | |
| if ch.isprintable() or ch in '\n\t ': result.append(ch) | |
| except: pass | |
| i += 2 | |
| text = ''.join(result).strip() | |
| text = re.sub(r'[ \t]+', ' ', text) | |
| text = re.sub(r'\n{3,}', '\n\n', text) | |
| return text if len(text) > 2 else None | |
| def _extract_hwp_section(data): | |
| """HWP ์น์ ์ถ์ถ""" | |
| texts = [] | |
| pos = 0 | |
| while pos < len(data) - 4: | |
| try: | |
| header = int.from_bytes(data[pos:pos+4], 'little') | |
| tag_id = header & 0x3FF | |
| size = (header >> 20) & 0xFFF | |
| pos += 4 | |
| if size == 0xFFF: | |
| if pos + 4 > len(data): break | |
| size = int.from_bytes(data[pos:pos+4], 'little') | |
| pos += 4 | |
| if pos + size > len(data): break | |
| record_data = data[pos:pos+size] | |
| pos += size | |
| if tag_id == 67 and size > 0: | |
| t = _decode_hwp_para(record_data) | |
| if t: texts.append(t) | |
| except: | |
| pos += 1 | |
| return '\n'.join(texts) if texts else None | |
| def extract_text_from_hwp(file_path): | |
| """HWP(๊ตฌํ) โ ํ ์คํธ""" | |
| if not HAS_OLEFILE: return None, "olefile ์์" | |
| try: | |
| ole = olefile.OleFileIO(file_path) | |
| if not ole.exists('FileHeader'): | |
| ole.close(); return None, "HWP ํค๋ ์์" | |
| header_data = ole.openstream('FileHeader').read() | |
| is_compressed = (header_data[36] & 1) == 1 if len(header_data) > 36 else True | |
| all_texts = [] | |
| for entry in ole.listdir(): | |
| entry_path = '/'.join(entry) | |
| if 'Section' in entry_path and entry_path.endswith('_content.xml'): | |
| try: | |
| with ole.openstream(entry) as stream: | |
| content = stream.read() | |
| if is_compressed: | |
| try: | |
| content = zlib.decompress(content, -zlib.MAX_WBITS) | |
| except: pass | |
| t = _extract_hwp_section(content) | |
| if t: all_texts.append(t) | |
| except: pass | |
| ole.close() | |
| if all_texts: | |
| return all_texts, None | |
| return None, "HWP ํ ์คํธ ์์" | |
| except Exception as e: | |
| return None, f"HWP ์ค๋ฅ: {e}" | |
| def extract_file_text_api(file_obj): | |
| """ํ์ผ ๊ฐ์ฒด โ ํ ์คํธ""" | |
| if not file_obj: return "" | |
| fp = Path(file_obj.name) | |
| suffix = fp.suffix.lower() | |
| texts = None | |
| error = None | |
| if suffix == '.pdf': | |
| texts, error = extract_text_from_pdf(str(fp)) | |
| elif suffix == '.docx': | |
| texts, error = extract_text_from_docx(str(fp)) | |
| elif suffix in ['.txt', '.md', '.csv']: | |
| texts, error = extract_text_from_txt(str(fp)) | |
| elif suffix == '.hwpx': | |
| texts, error = extract_text_from_hwpx(str(fp)) | |
| elif suffix == '.hwp': | |
| texts, error = extract_text_from_hwp(str(fp)) | |
| else: | |
| texts, error = extract_text_from_txt(str(fp)) | |
| if error: | |
| return f"โ ๏ธ {error}" | |
| return '\n\n'.join(texts) if texts else "ํ ์คํธ ์ถ์ถ ์คํจ" | |
| # ============================================ | |
| # ๊ธฐ๋ณธ ํ ์คํธ ์ฒ๋ฆฌ | |
| # ============================================ | |
| def split_sentences(text): | |
| """๋ฌธ์ฅ ๋ถ๋ฆฌ""" | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| sents = re.split(r'[.!?]+(?=\s|$)', text) | |
| sents = [s.strip() for s in sents if s.strip()] | |
| return sents | |
| def split_words(text): | |
| """๋จ์ด ๋ถ๋ฆฌ""" | |
| return [w for w in re.findall(r'[๊ฐ-ํฃa-zA-Z0-9]+', text) if w] | |
| # ============================================ | |
| # HTTP ํฌํผ | |
| # ============================================ | |
| def http_get(url, headers=None, timeout=10): | |
| """HTTP GET""" | |
| if HAS_HTTPX: | |
| try: | |
| r = httpx.get(url, headers=headers, timeout=timeout) | |
| return r.text if r.status_code == 200 else None | |
| except: return None | |
| return None | |
| # ============================================ | |
| # ์น ๊ฒ์ ํจ์๋ค | |
| # ============================================ | |
| def brave_search(query, count=5): | |
| """Brave Search API""" | |
| BRAVE_KEY = os.getenv("BRAVE_API_KEY", "") | |
| if not BRAVE_KEY: return [] | |
| url = f"https://api.search.brave.com/res/v1/web/search?q={query}&count={count}" | |
| try: | |
| if HAS_HTTPX: | |
| r = httpx.get(url, headers={"X-Subscription-Token": BRAVE_KEY, "Accept": "application/json"}, timeout=10) | |
| if r.status_code == 200: | |
| data = r.json() | |
| results = [] | |
| for item in data.get("web", {}).get("results", []): | |
| results.append({"title": item.get("title",""), "url": item.get("url",""), "snippet": item.get("description",""), "source": "Brave"}) | |
| return results | |
| except: pass | |
| return [] | |
| def search_kci(query): | |
| """KCI ๊ฒ์""" | |
| try: | |
| url = f"https://open.kci.go.kr/po/openapi/openApiSearch.kci?apiCode=articleSearch&title={query}&displayCount=3" | |
| resp = http_get(url, timeout=8) | |
| if resp: | |
| results = [] | |
| for m in re.finditer(r'<article-title><!\[CDATA\[(.+?)\]\]></article-title>.*?<url><!\[CDATA\[(.+?)\]\]></url>', resp, re.S): | |
| results.append({"title": m.group(1), "url": m.group(2), "snippet": "", "source": "KCI"}) | |
| return results[:3] | |
| except: pass | |
| return [] | |
| def search_riss(query): | |
| """RISS ๊ฒ์""" | |
| results = [] | |
| try: | |
| url = f"http://www.riss.kr/search/Search.do?isDetailSearch=N&searchGubun=true&viewYn=OP&queryText=&strQuery={query}&iStartCount=0&iGroupView=5&icate=all" | |
| resp = http_get(url, timeout=8) | |
| if resp: | |
| for m in re.finditer(r'class="title"[^>]*>.*?<a[^>]*href="([^"]+)"[^>]*>(.*?)</a>', resp, re.S): | |
| title = re.sub(r'<[^>]+>', '', m.group(2)).strip() | |
| if title: | |
| results.append({"title": title, "url": "https://www.riss.kr" + m.group(1), "snippet": "", "source": "RISS"}) | |
| except: pass | |
| return results[:3] | |
| def search_arxiv(query): | |
| """arXiv ๊ฒ์""" | |
| results = [] | |
| try: | |
| import urllib.parse | |
| q = urllib.parse.quote(query) | |
| url = f"https://export.arxiv.org/api/query?search_query=all:{q}&start=0&max_results=3&sortBy=relevance" | |
| resp = http_get(url, timeout=12) | |
| if resp: | |
| for m in re.finditer(r'<entry>.*?<title>(.*?)</title>.*?<id>(.*?)</id>.*?<summary>(.*?)</summary>', resp, re.S): | |
| title = re.sub(r'\s+', ' ', m.group(1)).strip() | |
| results.append({"title": title, "url": m.group(2).strip(), "snippet": re.sub(r'\s+', ' ', m.group(3)).strip()[:150], "source": "arXiv"}) | |
| except: pass | |
| return results[:3] | |
| def duckduckgo_search(query, max_results=5): | |
| """DuckDuckGo ๊ฒ์""" | |
| results = [] | |
| try: | |
| import urllib.parse | |
| q = urllib.parse.quote(query) | |
| url = f"https://html.duckduckgo.com/html/?q={q}" | |
| headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"} | |
| resp = http_get(url, headers=headers, timeout=10) | |
| if resp: | |
| for m in re.finditer(r'<a[^>]+class="result__a"[^>]+href="([^"]+)"[^>]*>(.*?)</a>.*?<a[^>]+class="result__snippet"[^>]*>(.*?)</a>', resp, re.S): | |
| href = m.group(1) | |
| title = re.sub(r'<[^>]+>', '', m.group(2)).strip() | |
| snippet = re.sub(r'<[^>]+>', '', m.group(3)).strip() | |
| real_url = href | |
| if 'uddg=' in href: | |
| um = re.search(r'uddg=([^&]+)', href) | |
| if um: real_url = urllib.parse.unquote(um.group(1)) | |
| if title: | |
| results.append({"title": title, "url": real_url, "snippet": snippet, "source": "Web"}) | |
| if len(results) >= max_results: break | |
| except: pass | |
| return results | |
| def self_crawl_search(query, max_results=3): | |
| """DuckDuckGo ํฌ๋กค๋ง""" | |
| all_results = [] | |
| all_results.extend(duckduckgo_search(query, max_results)) | |
| if '๋ ผ๋ฌธ' not in query and 'paper' not in query.lower(): | |
| all_results.extend(duckduckgo_search(f"{query} ๋ ผ๋ฌธ ํ์ ", 2)) | |
| return all_results | |
| def parallel_brave_search(queries, max_workers=10): | |
| """Brave Search ๋ณ๋ ฌ ์คํ""" | |
| all_results = {} | |
| with ThreadPoolExecutor(max_workers=min(max_workers, 20)) as executor: | |
| futures = {executor.submit(brave_search, q, 3): q for q in queries} | |
| for future in as_completed(futures): | |
| q = futures[future] | |
| try: | |
| results = future.result() | |
| all_results[q] = results | |
| except: all_results[q] = [] | |
| return all_results | |