abjasrees commited on
Commit
b444f65
·
verified ·
1 Parent(s): fe4c6e7

Create file_process.py

Browse files
Files changed (1) hide show
  1. file_process.py +90 -0
file_process.py ADDED
@@ -0,0 +1,90 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # files_process.py
2
+ import pathlib
3
+ from typing import Union
4
+ from pypdf import PdfReader
5
+ from docx import Document
6
+
7
+ def _read_file_by_ext(p: pathlib.Path) -> str:
8
+ ext = p.suffix.lower()
9
+ if ext == ".txt":
10
+ return p.read_text(encoding="utf-8", errors="ignore")
11
+ if ext == ".docx":
12
+ doc = Document(str(p))
13
+ return "\n".join(paragraph.text for paragraph in doc.paragraphs)
14
+ if ext == ".pdf":
15
+ reader = PdfReader(str(p))
16
+ pages = []
17
+ for page in reader.pages:
18
+ t = page.extract_text()
19
+ if t:
20
+ pages.append(t)
21
+ return "\n".join(pages)
22
+ raise ValueError(f"Unsupported file extension: {ext}. Use .txt / .docx / .pdf.")
23
+
24
+ def load_input_text(input_arg: Union[str, pathlib.Path]) -> str:
25
+ """
26
+ Load text from a string, or from a file path (.txt, .docx, .pdf).
27
+ - If the argument looks like plain text (contains newlines or is very long), return it as-is.
28
+ - Otherwise, if it resolves to an existing file, read it by extension.
29
+ - On any OSError from filesystem probing (e.g., Errno 36), treat as raw text.
30
+ """
31
+ if input_arg is None:
32
+ raise ValueError("input_arg is required")
33
+
34
+ if isinstance(input_arg, pathlib.Path):
35
+ try:
36
+ if input_arg.exists():
37
+ return _read_file_by_ext(input_arg)
38
+ return str(input_arg)
39
+ except OSError:
40
+ return str(input_arg)
41
+
42
+ s = str(input_arg)
43
+ if ("\n" in s) or ("\r" in s) or (len(s) > 512):
44
+ return s
45
+
46
+ p = pathlib.Path(s)
47
+ try:
48
+ if p.exists():
49
+ return _read_file_by_ext(p)
50
+ return s
51
+ except OSError:
52
+ return s
53
+
54
+ def prepare_input_arg(text_value: str | None, file_obj) -> str:
55
+ """
56
+ Combine textbox text and a single uploaded file (.txt/.docx/.pdf).
57
+ If both present, concatenate into a temp text file and return its path.
58
+ Compatible with Gradio/Scripts where file_obj may have a .name attribute or be a dict.
59
+ """
60
+ text = (text_value or "").strip()
61
+ if file_obj is None and not text:
62
+ raise ValueError("Provide either text or upload a .txt/.docx/.pdf")
63
+
64
+ # If only text
65
+ if file_obj is None:
66
+ return text
67
+
68
+ # Best-effort path extraction
69
+ if hasattr(file_obj, "name") and isinstance(file_obj.name, str):
70
+ up_path = pathlib.Path(file_obj.name)
71
+ elif isinstance(file_obj, dict) and "name" in file_obj:
72
+ up_path = pathlib.Path(file_obj["name"])
73
+ else:
74
+ # As a fallback, write bytes if available
75
+ data = getattr(file_obj, "read", None)
76
+ if callable(data):
77
+ content = file_obj.read()
78
+ up_path = pathlib.Path("/tmp/upload.bin")
79
+ up_path.write_bytes(content)
80
+ else:
81
+ raise ValueError("Unsupported uploaded file object; missing .name or .read()")
82
+
83
+ if text:
84
+ tmp = pathlib.Path("/tmp/_concat_input.txt")
85
+ tmp.write_text(text + "\n\n", encoding="utf-8")
86
+ appended = load_input_text(str(up_path))
87
+ tmp.write_text(tmp.read_text(encoding="utf-8") + appended, encoding="utf-8")
88
+ return str(tmp)
89
+
90
+ return str(up_path)