|
from settings import char_remove |
|
import re |
|
import json |
|
import sys |
|
import logging |
|
|
|
class Logger: |
|
def __init__(self, filename): |
|
self.terminal = sys.stdout |
|
self.log = open(filename, "w") |
|
|
|
def write(self, message): |
|
self.terminal.write(message) |
|
self.log.write(message) |
|
|
|
def flush(self): |
|
self.terminal.flush() |
|
self.log.flush() |
|
|
|
def isatty(self): |
|
return False |
|
|
|
sys.stdout = Logger("output.log") |
|
logging.basicConfig(filename='app.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
def remove_na(string): |
|
for char in char_remove: |
|
string = string.replace(char, "") |
|
return string |
|
|
|
def save_json(text, filename): |
|
filename = filename+".json" |
|
with open(filename, "w", encoding='utf-8') as outfile: |
|
json.dump(text, outfile, ensure_ascii=False) |
|
return filename |
|
|
|
def format_polygon(polygon): |
|
if not polygon: |
|
return "N/A" |
|
return ", ".join(["[{}, {}]".format(p.x, p.y) for p in polygon]) |
|
|
|
def filter_tables(input_string, table_numbers): |
|
|
|
tables = re.split(r"Table # \d+", input_string)[1:] |
|
|
|
json_tables = {} |
|
table_counter = 1 |
|
|
|
for table_number in table_numbers: |
|
|
|
try: |
|
table_str = tables[table_number] |
|
except (IndexError, UnboundLocalError) as e: |
|
logging.error(f"Error: {e}, Please check document configuration or document type") |
|
print(f"Error: {e}, Please check document configuration or document type") |
|
raise e |
|
|
|
cells = re.findall(r"Cell\[(\d+)\]\[(\d+)\] has content '(.*?)'", table_str) |
|
|
|
|
|
num_rows = max([int(cell[0]) for cell in cells]) + 1 |
|
num_cols = max([int(cell[1]) for cell in cells]) + 1 |
|
|
|
|
|
table = [["" for _ in range(num_cols)] for _ in range(num_rows)] |
|
|
|
|
|
for cell in cells: |
|
row, col, content = int(cell[0]), int(cell[1]), cell[2] |
|
table[row][col] = content |
|
|
|
|
|
json_tables[f"table_{table_counter}"] = table |
|
|
|
|
|
table_counter += 1 |
|
|
|
|
|
json_string = json.dumps(json_tables) |
|
|
|
return json_string |
|
|
|
def extract_text_within_range(input_string, x_range, y_range): |
|
pattern = r"Line # \d+ text '([^']*)' within bounding polygon '(\[[\d.]+, [\d.]+\], \[[\d.]+, [\d.]+\], \[[\d.]+, [\d.]+\], \[[\d.]+, [\d.]+\])'" |
|
matches = re.findall(pattern, input_string) |
|
|
|
output = [] |
|
|
|
for text, polygon_str in matches: |
|
polygon = eval(polygon_str) |
|
for (x, y) in polygon: |
|
if x_range[0] <= x <= x_range[1] and y_range[0] <= y <= y_range[1]: |
|
output.append(text) |
|
break |
|
|
|
return output |
|
|
|
def merge_strings(input_string, input_coords, extract_coords): |
|
lines1 = input_string.split('\n') |
|
lines2 = input_coords.split('\n') |
|
|
|
lines2 = [line.strip() for line in lines2 if line.strip()] |
|
|
|
|
|
try: |
|
dict1 = {line.split(": ")[0]: line.split(": ")[1] for line in lines1} |
|
dict2 = {line.split(": ")[0]: line.split(": ")[1] for line in lines2} |
|
except (IndexError, UnboundLocalError) as e: |
|
logging.error(f"Error: {e}, Please check document configuration or document type") |
|
print(f"Error: {e}, Please check document configuration or document type") |
|
raise e |
|
|
|
|
|
for key in dict1.keys(): |
|
if key in dict2: |
|
dict1[key] = dict2[key] |
|
|
|
for key, coord_str in dict1.items(): |
|
if coord_str.startswith('('): |
|
|
|
coords = eval(coord_str) |
|
|
|
x_range = (coords[0][0], coords[1][0]) |
|
y_range = (coords[0][1], coords[1][1]) |
|
|
|
text = extract_text_within_range(extract_coords, x_range, y_range) |
|
|
|
dict1[key] = ', '.join(text) if text else '-||-' |
|
|
|
|
|
input_string = '\n'.join([f"{key}: {value}" for key, value in dict1.items()]) |
|
|
|
return input_string |
|
|
|
def read_logs(): |
|
sys.stdout.flush() |
|
with open("output.log","r",encoding="utf-8") as f: |
|
lines = f.readlines() |
|
return ''.join(lines[-100:]) |
|
|
|
def clear_logs(): |
|
with open("output.log","w",encoding="utf-8") as f: |
|
f.write("") |