Spaces:
Running
Running
File size: 59,287 Bytes
5a67931 dc7b3c3 af0580e 5a67931 2720901 eec7e13 5a67931 0d9f3a7 dc7b3c3 0d9f3a7 6ba5cc4 0d9f3a7 fd35ba2 5a67931 2720901 5a67931 fd35ba2 5a67931 fd35ba2 5a67931 fd35ba2 5a67931 951ce76 5a67931 951ce76 5a67931 951ce76 5a67931 fd35ba2 5a67931 2720901 5a67931 fd35ba2 af0580e fd35ba2 2720901 0d9f3a7 2720901 0d9f3a7 af0580e 2720901 af0580e 0d9f3a7 af0580e 2720901 af0580e 5a67931 fd35ba2 951ce76 fd35ba2 951ce76 fd35ba2 951ce76 fd35ba2 5a67931 1f01c66 5a67931 1f01c66 951ce76 1f01c66 951ce76 5a67931 2720901 5a67931 acc27da 5a67931 2720901 5a67931 30d646c 5a67931 acc27da 5a67931 0d9f3a7 5a67931 acc27da 0d9f3a7 acc27da 0d9f3a7 acc27da 1f01c66 951ce76 1f01c66 acc27da 5a67931 acc27da 1f01c66 acc27da 5a67931 acc27da 5a67931 acc27da 5a67931 1f01c66 951ce76 1f01c66 951ce76 1f01c66 951ce76 1f01c66 2720901 af0580e fd35ba2 2720901 af0580e 5a67931 dc7b3c3 fd35ba2 dc7b3c3 fd35ba2 2720901 dc7b3c3 fd35ba2 2720901 fd35ba2 2720901 fd35ba2 73fb567 fd35ba2 1f01c66 2720901 fd35ba2 2720901 fd35ba2 2720901 fd35ba2 2720901 fd35ba2 2720901 fd35ba2 2720901 fd35ba2 2720901 fd35ba2 2720901 fd35ba2 2720901 fd35ba2 dc7b3c3 fd35ba2 dc7b3c3 fd35ba2 dc7b3c3 73fb567 dc7b3c3 af0580e fd35ba2 dc7b3c3 fd35ba2 dc7b3c3 fd35ba2 dc7b3c3 fd35ba2 dc7b3c3 fd35ba2 dc7b3c3 fd35ba2 dc7b3c3 fd35ba2 dc7b3c3 fd35ba2 2720901 fd35ba2 2720901 fd35ba2 fb7be42 fd35ba2 fb7be42 fd35ba2 dc7b3c3 fd35ba2 fb7be42 fd35ba2 2720901 fd35ba2 fb7be42 fd35ba2 dc7b3c3 fd35ba2 fb7be42 fd35ba2 dc7b3c3 fd35ba2 fb7be42 fd35ba2 2720901 fd35ba2 2720901 fd35ba2 fb7be42 2720901 fd35ba2 fb7be42 fd35ba2 fb7be42 dc7b3c3 fd35ba2 dc7b3c3 fd35ba2 af0580e dc7b3c3 dd620cd dc7b3c3 2720901 dc7b3c3 5a67931 2720901 5a67931 2720901 fd35ba2 5a67931 c61bff0 a4dc2f9 951ce76 a4dc2f9 2720901 a4dc2f9 951ce76 2720901 a4dc2f9 951ce76 a4dc2f9 951ce76 a4dc2f9 2720901 a4dc2f9 2720901 5a67931 a4dc2f9 5a67931 a4dc2f9 951ce76 5a67931 2720901 1f01c66 5a67931 dc7b3c3 af0580e 5a67931 83f48f4 2720901 5a67931 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 |
import os
from dotenv import load_dotenv
load_dotenv()
import json
import pandas as pd
import zipfile
import xml.etree.ElementTree as ET
from io import BytesIO
import openpyxl
from openai import OpenAI
import re
from pydantic import BaseModel, Field, ValidationError, RootModel
from typing import List, Optional
from fuzzywuzzy import fuzz
from fuzzywuzzy import process
HF_API_KEY = os.getenv("HF_API_KEY")
# Deepseek R1 Distilled Qwen 2.5 14B --------------------------------
# base_url = "https://router.huggingface.co/novita"
# model = "deepseek/deepseek-r1-distill-qwen-14b"
# Deepseek R1 Distilled Qwen 2.5 32B --------------------------------
# base_url = "https://router.huggingface.co/hf-inference/models/deepseek-ai/DeepSeek-R1-Distill-Qwen-32B/v1"
# model = "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B"
# Qwen 2.5 7B --------------------------------------------------------
base_url = "https://router.huggingface.co/together/v1"
model= "Qwen/Qwen2.5-7B-Instruct-Turbo"
# Qwen 2.5 32B --------------------------------------------------------
# base_url = "https://router.huggingface.co/novita/v3/openai"
# model="qwen/qwen-2.5-72b-instruct"
# Qwen 3 32B --------------------------------------------------------
# base_url = "https://router.huggingface.co/sambanova/v1"
# model="Qwen3-32B"
# Default Word XML namespace
DEFAULT_NS = {'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}
NS = None # Global variable to store the namespace
def get_namespace(root):
"""Extracts the primary namespace from the XML root element while keeping the default."""
global NS
ns = root.tag.split('}')[0].strip('{')
NS = {'w': ns} if ns else DEFAULT_NS
return NS
# --- Helper Functions for DOCX Processing ---
def extract_text_from_cell(cell):
"""Extracts text from a Word table cell, preserving line breaks and reconstructing split words."""
paragraphs = cell.findall('.//w:p', NS)
lines = []
for paragraph in paragraphs:
# Get all text runs and concatenate their contents
text_runs = [t.text for t in paragraph.findall('.//w:t', NS) if t.text]
line = ''.join(text_runs).strip() # Merge split words properly
if line: # Add only non-empty lines
lines.append(line)
return lines # Return list of lines to preserve line breaks
def clean_spaces(text):
r"""
Removes excessive spaces between Chinese characters while preserving spaces in English words.
Also normalizes multiple spaces to single space and ensures one space between Chinese and English.
"""
if not text or not isinstance(text, str):
return text
# Remove spaces between Chinese characters
text = re.sub(r'([\u4e00-\u9fff])\s+([\u4e00-\u9fff])', r'\1\2', text)
# Ensure one space between Chinese and English
text = re.sub(r'([\u4e00-\u9fff])\s*([a-zA-Z])', r'\1 \2', text)
text = re.sub(r'([a-zA-Z])\s*([\u4e00-\u9fff])', r'\1 \2', text)
# Normalize multiple spaces to single space
text = re.sub(r'\s+', ' ', text)
return text.strip()
def extract_key_value_pairs(text, target_dict=None):
"""
Extracts multiple key-value pairs from a given text.
- First, split by more than 3 spaces (`\s{3,}`) **only if the next segment contains a `:`.**
- Then, process each segment by splitting at `:` to correctly assign keys and values.
"""
if target_dict is None:
target_dict = {}
text = text.replace(":", ":") # Normalize Chinese colons to English colons
# Step 1: Check if splitting by more than 3 spaces is necessary
segments = re.split(r'(\s{3,})', text) # Use raw string to prevent invalid escape sequence
# Step 2: Process each segment, ensuring we only split if the next part has a `:`
merged_segments = []
temp_segment = ""
for segment in segments:
if ":" in segment: # If segment contains `:`, it's a valid split point
if temp_segment:
merged_segments.append(temp_segment.strip())
temp_segment = ""
merged_segments.append(segment.strip())
else:
temp_segment += " " + segment.strip()
if temp_segment:
merged_segments.append(temp_segment.strip())
# Step 3: Extract key-value pairs correctly
for segment in merged_segments:
if ':' in segment:
key, value = segment.split(':', 1) # Only split at the first colon
key, value = key.strip(), value.strip() # Clean spaces
if key in target_dict:
target_dict[key] += "\n" + value # Append if key already exists
else:
target_dict[key] = value
return target_dict
# --- Table Processing Functions ---
def process_unknown_table(rows):
"""Processes unknown tables and returns the extracted lines as a list."""
unknown_table_data = []
for row in rows:
cells = row.findall('.//w:tc', NS)
if len(cells) == 1:
cell_lines = extract_text_from_cell(cells[0]) # Extract all lines from the cell
# Append each line directly to the list without splitting
unknown_table_data.extend(cell_lines)
return unknown_table_data # Return the list of extracted lines
def process_buyer_seller_table(rows):
"""Processes a two-column buyer-seller table into a structured dictionary using the first row as keys."""
headers = [extract_text_from_cell(cell) for cell in rows[0].findall('.//w:tc', NS)]
if len(headers) != 2:
return None # Not a buyer-seller table
# determine role based on header text
def get_role(header_text, default_role):
header_text = header_text.lower() # Convert to lowercase
if '买方' in header_text or 'buyer' in header_text or '甲方' in header_text:
return 'buyer_info'
elif '卖方' in header_text or 'seller' in header_text or '乙方' in header_text:
return 'seller_info'
else:
return default_role # Default if no keyword is found
# Determine the keys for buyer and seller columns
buyer_key = get_role(headers[0][0], 'buyer_info')
seller_key = get_role(headers[1][0], 'seller_info')
# Initialize the dictionary using the determined keys
buyer_seller_data = {
buyer_key: {},
seller_key: {}
}
for row in rows:
cells = row.findall('.//w:tc', NS)
if len(cells) == 2:
buyer_lines = extract_text_from_cell(cells[0])
seller_lines = extract_text_from_cell(cells[1])
for line in buyer_lines:
extract_key_value_pairs(line, buyer_seller_data[buyer_key])
for line in seller_lines:
extract_key_value_pairs(line, buyer_seller_data[seller_key])
return buyer_seller_data
def process_summary_table(rows):
"""Processes a two-column summary table where keys are extracted as dictionary keys."""
extracted_data = []
for row in rows:
cells = row.findall('.//w:tc', NS)
if len(cells) == 2:
key = " ".join(extract_text_from_cell(cells[0]))
value = " ".join(extract_text_from_cell(cells[1]))
extracted_data.append({key: value})
return extracted_data
def clean_header_spaces(headers):
"""
Cleans headers for consistent matching by:
1. Normalizing multiple spaces to single space
2. Ensuring exactly one space between Chinese and English
3. Converting to lowercase
"""
if not headers:
return headers
cleaned_headers = []
for header in headers:
if not header:
cleaned_headers.append(header)
continue
# Normalize multiple spaces to single space
header = re.sub(r'\s+', ' ', header)
# Ensure exactly one space between Chinese and English
header = re.sub(r'([\u4e00-\u9fff])\s*([a-zA-Z])', r'\1 \2', header)
header = re.sub(r'([a-zA-Z])\s*([\u4e00-\u9fff])', r'\1 \2', header)
# Final cleanup of any remaining multiple spaces
header = re.sub(r'\s+', ' ', header)
# Convert to lowercase
header = header.lower()
cleaned_headers.append(header.strip())
return cleaned_headers
def extract_headers(first_row_cells):
"""Extracts unique column headers from the first row of a table."""
headers = []
header_count = {}
for cell in first_row_cells:
cell_text = " ".join(extract_text_from_cell(cell))
grid_span = cell.find('.//w:gridSpan', NS)
col_span = int(grid_span.attrib.get(f'{{{NS["w"]}}}val', '1')) if grid_span is not None else 1
for _ in range(col_span):
# Ensure header uniqueness by appending an index if repeated
if cell_text in header_count:
header_count[cell_text] += 1
unique_header = f"{cell_text}_{header_count[cell_text]}"
else:
header_count[cell_text] = 1
unique_header = cell_text
headers.append(unique_header if unique_header else f"Column_{len(headers) + 1}")
return headers
def process_long_table(rows):
"""Processes a standard table and correctly handles horizontally merged cells."""
if not rows:
return [] # Avoid IndexError
headers = extract_headers(rows[0].findall('.//w:tc', NS))
table_data = []
vertical_merge_tracker = {}
for row in rows[1:]:
row_data = {}
cells = row.findall('.//w:tc', NS)
running_index = 0
# Skip rows with only 1 or 2 columns (merged cells)
if len(cells) <= 2:
continue
for cell in cells:
cell_text = " ".join(extract_text_from_cell(cell))
# Consistent Namespace Handling for Horizontal Merge
grid_span = cell.find('.//w:gridSpan', NS)
grid_span_val = grid_span.attrib.get(f'{{{NS["w"]}}}val') if grid_span is not None else '1'
col_span = int(grid_span_val)
# Handle vertical merge
v_merge = cell.find('.//w:vMerge', NS)
if v_merge is not None:
v_merge_val = v_merge.attrib.get(f'{{{NS["w"]}}}val')
if v_merge_val == 'restart':
vertical_merge_tracker[running_index] = cell_text
else:
# Repeat the value from the previous row's merged cell
cell_text = vertical_merge_tracker.get(running_index, "")
# Repeat the value for horizontally merged cells
start_col = running_index
end_col = running_index + col_span
# Repeat the value for each spanned column
for col in range(start_col, end_col):
key = headers[col] if col < len(headers) else f"Column_{col+1}"
row_data[key] = cell_text
# Update the running index to the end of the merged cell
running_index = end_col
# Fill remaining columns with empty strings to maintain alignment
while running_index < len(headers):
row_data[headers[running_index]] = ""
running_index += 1
table_data.append(row_data)
# Clean the keys in the table data
cleaned_table_data = []
for row in table_data:
cleaned_row = {}
for key, value in row.items():
# Clean the key using the same function we use for headers
cleaned_key = clean_header_spaces([key])[0]
cleaned_row[cleaned_key] = value
cleaned_table_data.append(cleaned_row)
# Filter out rows where the "序号" column contains non-numeric values
filtered_table_data = []
for row in cleaned_table_data:
# Check if any cell contains "合计" (total), "折扣" (discount), or "明细见附件" (details in attachment)
# But exclude the remarks column from this check
contains_total = False
for key, value in row.items():
# Skip if this is a remarks column
key_lower = key.lower()
if any(term in key_lower for term in ["备注", "remarks", "note", "notes"]):
continue # Skip remarks column
if isinstance(value, str) and ("小计" in value or "总金额" in value or "合计" in value or "折扣" in value or "明细见附件" in value):
contains_total = True
break
if contains_total:
continue
# Check potential serial number columns (use both Chinese and English variants)
serial_number = None
for column in row:
if any(term in column.lower() for term in ["序号"]):
serial_number = row[column]
break
# If we found a serial number column, check if its value is numeric
if serial_number is not None:
# Skip if serial number is empty
if not serial_number.strip():
continue
# Strip any non-numeric characters and check if there's still a value
# This keeps values like "1", "2." etc. but filters out "No." or other text
cleaned_number = re.sub(r'[^\d]', '', serial_number)
if cleaned_number: # If there are any digits left, keep the row
filtered_table_data.append(row)
else:
# If we couldn't find a serial number column, keep the row
filtered_table_data.append(row)
# Remove duplicate columns (ending with _2, _3, etc.)
filtered_table_data = merge_duplicate_columns(filtered_table_data)
return filtered_table_data
def identify_table_type_and_header_row(rows):
"""Identify table type and the index of the header row."""
for i, row in enumerate(rows):
num_cells = len(row.findall('.//w:tc', NS))
if num_cells > 1:
# Check for buyer-seller or summary table based on structure only
if num_cells == 2:
if all(len(r.findall('.//w:tc', NS)) == 2 for r in rows):
# Check if it contains buyer/seller keywords
cell_texts = " ".join([" ".join(extract_text_from_cell(cell)) for cell in row.findall('.//w:tc', NS)])
buyer_seller_keywords = ["买方", "buyer", "卖方", "seller"]
if any(keyword.lower() in cell_texts.lower() for keyword in buyer_seller_keywords):
return "buyer_seller", i
else:
return "unknown", i
else:
return "summary", i
else:
# For tables with more than 2 columns, process as long table
return "long_table", i
# Fallbacks
return "unknown", 0
def extract_tables(root):
"""Extracts tables from the DOCX document and returns structured data, skipping tables whose title contains 'template'.
Handles cases where there are blank paragraphs between the title and the table."""
# Find the document body (usually the first child of root)
body = root.find('.//w:body', NS)
if body is None:
body = root # fallback if structure is different
table_data = {}
table_paragraphs = set()
table_index = 1
last_paragraphs = [] # Store the last few paragraphs (max 3)
# Iterate through direct children of the body
for elem in list(body):
if elem.tag == f'{{{NS["w"]}}}p':
# Keep a rolling list of the last 3 paragraphs
last_paragraphs.append(elem)
if len(last_paragraphs) > 3:
last_paragraphs.pop(0)
elif elem.tag == f'{{{NS["w"]}}}tbl':
# Look back through last_paragraphs for the most recent non-empty one
title = ""
for para in reversed(last_paragraphs):
texts = [t.text for t in para.findall('.//w:t', NS) if t.text]
candidate = ' '.join(texts).strip()
if candidate:
title = candidate
break
# If title contains 'template', skip this table
if title and 'template' in title.lower():
continue
rows = elem.findall('.//w:tr', NS)
if not rows:
continue # Skip empty tables
for paragraph in elem.findall('.//w:p', NS):
table_paragraphs.add(paragraph)
table_type, header_row_index = identify_table_type_and_header_row(rows)
if table_type == "unknown":
unknown_table_data = process_unknown_table(rows)
if unknown_table_data:
table_data[f"table_{table_index}_unknown"] = unknown_table_data
table_index += 1
continue
elif table_type == "buyer_seller":
buyer_seller_data = process_buyer_seller_table(rows[header_row_index:])
if buyer_seller_data:
table_data[f"table_{table_index}_buyer_seller"] = buyer_seller_data
table_index += 1
continue
elif table_type == "summary":
summary_data = process_summary_table(rows[header_row_index:])
if summary_data:
table_data[f"table_{table_index}_summary"] = summary_data
table_index += 1
continue
elif table_type == "long_table":
long_table_data = process_long_table(rows[header_row_index:])
if long_table_data:
table_data[f"long_table_{table_index}"] = long_table_data
table_index += 1
continue
return table_data, table_paragraphs
# --- Non-Table Processing Functions ---
def extract_text_outside_tables(root, table_paragraphs):
"""Extracts text from paragraphs outside tables in the document."""
extracted_text = []
for paragraph in root.findall('.//w:p', NS):
if paragraph in table_paragraphs:
continue # Skip paragraphs inside tables
texts = [t.text.strip() for t in paragraph.findall('.//w:t', NS) if t.text]
line = clean_spaces(' '.join(texts).replace(':',':')) # Clean colons and spaces
if ':' in line:
extracted_text.append(line)
return extracted_text
# --- Main Extraction Functions ---
def extract_docx_as_xml(file_bytes, save_xml=False, xml_filename="document.xml"):
# Ensure file_bytes is at the start position
file_bytes.seek(0)
with zipfile.ZipFile(file_bytes, 'r') as docx:
with docx.open('word/document.xml') as xml_file:
xml_content = xml_file.read().decode('utf-8')
if save_xml:
with open(xml_filename, "w", encoding="utf-8") as f:
f.write(xml_content)
return xml_content
def xml_to_json(xml_content, save_json=False, json_filename="extracted_data.json"):
tree = ET.ElementTree(ET.fromstring(xml_content))
root = tree.getroot()
table_data, table_paragraphs = extract_tables(root)
extracted_data = table_data
extracted_data["non_table_data"] = extract_text_outside_tables(root, table_paragraphs)
if save_json:
with open(json_filename, "w", encoding="utf-8") as f:
json.dump(extracted_data, f, ensure_ascii=False, indent=4)
return json.dumps(extracted_data, ensure_ascii=False, indent=4)
def extract_contract_summary(json_data, save_json=False, json_filename="contract_summary.json"):
"""Sends extracted JSON data to OpenAI and returns formatted structured JSON."""
# Step 1: Convert JSON string to Python dictionary
contract_data = json.loads(json_data)
# Step 2: Remove keys that contain "long_table"
filtered_contract_data = {key: value for key, value in contract_data.items() if "long_table" not in key}
# Step 3: Convert back to JSON string (if needed)
json_output = json.dumps(contract_data, ensure_ascii=False, indent=4)
# Define Pydantic model for contract summary validation
class ContractSummary(BaseModel):
合同编号: Optional[str] = ""
接收人: Optional[str] = ""
Recipient: Optional[str] = ""
接收地: Optional[str] = ""
Place_of_receipt: Optional[str] = Field("", alias="Place of receipt")
供应商: Optional[str] = ""
币种: Optional[str] = ""
供货日期: Optional[str] = ""
base_prompt = """You are given a contract in JSON format. Extract the following information:
# Response Format
Return the extracted information as a structured JSON in the exact format shown below (Note: Do not repeat any keys, if unsure leave the value empty):
{
"合同编号": 如果合同编号出现多次,只需填一个,不要重复,优先填写有"-"的合同编号
"接收人": (注意:不是买家必须是接收人,不是一个公司而是一个人)
"Recipient":
"接收地": (注意:不是交货地点是目的港,只写中文,英文写在 place of receipt)
"Place of receipt": (只写英文, 如果接收地/目的港/Port of destination 有英文可填在这里)
"供应商":
"币种": (主要用的货币,填英文缩写。GNF一般是为了方便而转换出来的, 除非只有GNF,GNF一般不是主要币种。)
"供货日期": (如果合同里有写才填,不要自己推理出日期,必须是一个日期,而不是天数)(格式:YYYY-MM-DD)
}
Contract data in JSON format:""" + f"""
{json_output}"""
messages = [
{
"role": "user",
"content": base_prompt
}
]
# Deepseek R1 Distilled Qwen 2.5 14B --------------------------------
client = OpenAI(
base_url=base_url,
api_key=HF_API_KEY,
)
# Try up to 3 times with error feedback
max_retries = 3
for attempt in range(max_retries):
try:
print(f"🔄 LLM attempt {attempt + 1} of {max_retries}")
completion = client.chat.completions.create(
model=model,
messages=messages,
temperature=0.1,
)
think_text = re.findall(r"<think>(.*?)</think>", completion.choices[0].message.content, flags=re.DOTALL)
if think_text:
print(f"🧠 Thought Process: {think_text}")
contract_summary = re.sub(r"<think>.*?</think>\s*", "", completion.choices[0].message.content, flags=re.DOTALL) # Remove think
contract_summary = re.sub(r"^```json\n|```$", "", contract_summary, flags=re.DOTALL) # Remove ```
# Clean up JSON before validation
contract_json = json.loads(contract_summary.strip())
# Clean 合同编号 by removing all contents in brackets including the brackets themselves
if "合同编号" in contract_json and contract_json["合同编号"]:
contract_json["合同编号"] = re.sub(r'[\((].*?[\))]', '', contract_json["合同编号"]).strip()
# Remove anything after "/" (including the "/" itself)
contract_json["合同编号"] = re.sub(r'/\s*.*$', '', contract_json["合同编号"]).strip()
validated_data = ContractSummary.model_validate(contract_json)
# Success! Return validated data
validated_json = json.dumps(validated_data.model_dump(by_alias=True), ensure_ascii=False, indent=4)
if save_json:
with open(json_filename, "w", encoding="utf-8") as f:
f.write(validated_json)
print(f"✅ Successfully validated contract summary on attempt {attempt + 1}")
return json.dumps(validated_json, ensure_ascii=False, indent=4)
except ValidationError as e:
error_msg = f"Validation error: {e}"
print(f"❌ {error_msg}")
except json.JSONDecodeError as e:
error_msg = f"JSON decode error: {e}"
print(f"❌ {error_msg}")
# Don't retry on the last attempt
if attempt < max_retries - 1:
# Add error message to the conversation and retry
messages.append({
"role": "assistant",
"content": completion.choices[0].message.content
})
messages.append({
"role": "user",
"content": f"Your response had the following error: {error_msg}. Please fix the format and provide a valid JSON response with the required fields."
})
# If we get here, all attempts failed - return empty but valid model
print("⚠️ All attempts failed, returning empty model")
empty_data = ContractSummary().model_dump(by_alias=True)
empty_json = json.dumps(empty_data, ensure_ascii=False, indent=4)
if save_json:
with open(json_filename, "w", encoding="utf-8") as f:
f.write(empty_json)
return json.dumps(empty_json, ensure_ascii=False, indent=4)
def find_price_list_table(extracted_data, min_matches=3):
price_keywords = [
"名称", "name", "规格", "specification", "型号", "model", "所属机型", "applicable models",
"单位", "unit", "数量", "quantity", "单价", "unit price", "总价", "amount",
"几郎单价", "unit price(gnf)", "几郎总价", "amount(gnf)", "备注", "remarks", "计划来源", "plan no",
"货描", "commodity",
]
last_price_list_table = None
last_price_list_key = None
# Get all long tables and sort them by key to ensure we process them in order
long_tables = [(key, table) for key, table in extracted_data.items()
if "long_table" in key and isinstance(table, list) and table]
long_tables.sort(key=lambda x: x[0]) # Sort by key to maintain order
for key, table in long_tables:
headers = list(table[0].keys())
match_count = 0
for header in headers:
header_lower = header.lower()
# Use fuzzy matching for keyword detection
for keyword in price_keywords:
if fuzz.partial_ratio(header_lower, keyword.lower()) >= 70:
match_count += 1
break # Found a match for this header, move to next
if match_count >= min_matches:
last_price_list_table = table # Keep the last table that meets criteria
last_price_list_key = key # Keep the key as well
return last_price_list_table, last_price_list_key
def extract_price_list(price_list, save_json=False, json_name="price_list.json", fuzzy=False):
"""
Extracts structured price list by first using hardcoded mapping, then falling back to AI if needed.
Set fuzzy=False to use direct string matching for mapping.
"""
# If price_list is empty, return an empty list
if not price_list:
return []
# Convert price_list to a list if it's a dict
if isinstance(price_list, dict):
# Check if the dict has any items
if len(price_list) == 0:
return []
# Convert to list if it's just a single entry dict
price_list = [price_list]
# Extract a sample row for header mapping
sample_row = price_list[0] if price_list else {}
# If there are no headers, return empty list
if not sample_row:
return []
# Get the headers directly from the sample row
extracted_headers = list(sample_row.keys())
# Clean double spaces in headers to facilitate matching
def clean_header_spaces(headers):
"""
Cleans headers for consistent matching by:
1. Normalizing multiple spaces to single space
2. Ensuring exactly one space between Chinese and English
"""
if not headers:
return headers
cleaned_headers = []
for header in headers:
if not header:
cleaned_headers.append(header)
continue
# Normalize multiple spaces to single space
header = re.sub(r'\s+', ' ', header)
# Ensure exactly one space between Chinese and English
header = re.sub(r'([\u4e00-\u9fff])\s*([a-zA-Z])', r'\1 \2', header)
header = re.sub(r'([a-zA-Z])\s*([\u4e00-\u9fff])', r'\1 \2', header)
# Final cleanup of any remaining multiple spaces
header = re.sub(r'\s+', ' ', header)
cleaned_headers.append(header.strip())
return cleaned_headers
# Define our target fields from the Pydantic model
target_fields = [
"序号", "名称", "名称(英文)", "品牌", "规格型号", "所属机型",
"数量", "单位", "单价", "总价", "几郎单价", "几郎总价",
"备注", "计划来源"
]
# Hardcoded mapping dictionary
hardcoded_mapping = {
# 序号 mappings
"序号": ["序号 no.", "序号 no", "no.", "no", "序号no.", "序号no", "序号 item", "序号item", "序号", "序号 no.:"],
# 名称 mappings
"名称": ["名称 name", "名称name", "name", "名称name of materials", "名称name of materials and equipment", "名称 name of materials", "名称 name of materials and equipment", "名称", "产品名称 product name", "货描", "commodity",],
# 名称(英文) mappings
"名称(英文)": ["名称 name", "名称name", "name", "名称name of materials", "名称name of materials and equipment", "名称 name of materials", "名称 name of materials and equipment", "名称", "产品名称 product name"],
# 品牌 mappings
"品牌": ["品牌 brand", "品牌brand", "brand", "品牌 brand", "品牌brand", "品牌"],
# 规格型号 mappings
"规格型号": ["规格型号 specification", "规格型号specification", "规格 specification", "规格specification",
"specification", "规格型号specification and model", "型号model", "型号 model", "规格型号 specification and model", "规格型号"],
# 所属机型 mappings
"所属机型": ["所属机型 applicable models", "所属机型applicable models", "applicable models", "所属机型"],
# 数量 mappings
"数量": ["数量 quantity", "数量quantity", "quantity", "qty", "数量qty", "数量"],
# 单位 mappings
"单位": ["单位 unit", "单位unit", "unit", "单位"],
# 单价 mappings
"单价": ["单价 unit price (cny)", "单价unit price (cny)", "单价(元)Unit Price (CNY)", "unit price (cny)", "单价unit price", "单价 unit price", "单价 unit price(cny)",
"单价(元)", "单价(cny)", "单价 unit price (cny)", "单价(欧元) unit price(eur)", "单价", "单价(元) unit price(cny)", "单价(元)unit price(cny)", "单价(欧元) unit price(eur)",
"价格 price", "价格price", "价格",
"美元单价"],
# 总价 mappings
"总价": ["总价 total amount (cny)", "总价total amount (cny)", "total amount (cny)", "总价total amount", "总价 total amount",
"总价(元)", "总额(元)", "总价 total amount (cny)", "总价(欧元) amount(eur)", "总价", "总价(元)amount (cny)", "总价(元)amount(cny)",
"总额 total amount (cny)", "总额", "总额 total amount","美元总价"],
# 几郎单价 mappings
"几郎单价": ["几郎单价 unit price (gnf)", "几郎单价unit price (gnf)", "unit price (gnf)", "几郎单价unit price", "几郎单价 unit price",
"几郎单价(元)", "单价(几郎)","单价 unit price (gnf)", "几郎单价 unit price (gnf)", "几郎单价", "单价 unit price(几郎)(gnf)", "单价(元)unit price(cny)", "几郎单价 unit price(gnf)"],
# 几郎总价 mappings
"几郎总价": ["几郎总价 total amount (gnf)", "几郎总价total amount (gnf)", "total amount (gnf)", "几郎总价total amount", "几郎总价 total amount",
"几郎总价(元)", "总额(几郎)", "几郎总价 total amount (gnf)", "几郎总价", "总额 total amount(几郎)(gnf)", "总价(元)amount(cny)", "几郎总价 amount(gnf)","总额 total amount (gnf)"],
# 备注 mappings
"备注": ["备注 remarks", "备注remarks", "remarks", "备注 notes", "备注notes", "note", "备注"],
# 计划来源 mappings
"计划来源": ["计划来源 plan no.", "计划来源plan no.", "计划来源(唛头信息)",
"计划来源 planned source", "计划来源planned source", "planned source", "计划来源","计划号 plan no."]
}
# Clean the extracted headers first
cleaned_extracted_headers = clean_header_spaces(extracted_headers)
# Clean all possible headers in the hardcoded mapping
cleaned_hardcoded_mapping = {
std_field: [clean_header_spaces([h])[0] for h in possible_headers]
for std_field, possible_headers in hardcoded_mapping.items()
}
# Fuzzy matching function
def fuzzy_match_header(header, possible_headers, threshold=70):
if not possible_headers:
return None, 0
best_match = process.extractOne(header, possible_headers, scorer=fuzz.ratio)
if best_match and best_match[1] >= threshold:
return best_match[0], best_match[1]
else:
return None, 0
# Try to map headers using hardcoded mapping (fuzzy or direct)
standard_field_mapping = {}
unmapped_headers = []
if fuzzy:
print("\n🔍 Fuzzy Hardcoded Mapping Results:")
else:
print("\n🔍 Direct Hardcoded Mapping Results:")
print("-" * 50)
for header in cleaned_extracted_headers:
header_mapped = False
if fuzzy:
best_match_score = 0
best_match_field = None
best_match_header = None
for std_field, possible_headers in cleaned_hardcoded_mapping.items():
if std_field in standard_field_mapping:
continue
matched_header, score = fuzzy_match_header(header, possible_headers, threshold=70)
if matched_header and score > best_match_score:
best_match_score = score
best_match_field = std_field
best_match_header = matched_header
if best_match_field and best_match_score >= 70:
standard_field_mapping[best_match_field] = header
header_mapped = True
print(f"✅ {best_match_field} -> {header} (score: {best_match_score})")
else:
for std_field, possible_headers in cleaned_hardcoded_mapping.items():
if std_field in standard_field_mapping:
continue
if header in possible_headers:
standard_field_mapping[std_field] = header
header_mapped = True
print(f"✅ {std_field} -> {header}")
break
if not header_mapped:
unmapped_headers.append(header)
print(f"❌ No match found for: {header}")
print("-" * 50)
# If we have unmapped headers, fall back to AI mapping
if unmapped_headers:
print(f"⚠️ Some headers could not be mapped using hardcoded mapping: {unmapped_headers}")
print("🔄 Falling back to AI mapping...")
# Get the list of standard fields that haven't been mapped yet
unmapped_standard_fields = [field for field in target_fields if field not in standard_field_mapping]
# Use AI to map remaining headers
base_prompt = f"""
You are playing a matching game. Match each and every standard fields to the exact column headers within "" separated by ,.
You must match all the given column headers to the standard fields to you best ability.
USE THE EXACT HEADER BELOW INCLUDING BOTH CHINESE AND ENGLISH AND THE EXACT SPACING.
The standard fields that need mapping are:
{json.dumps(unmapped_standard_fields, ensure_ascii=False)}
You are given column headers below: (YOU MUST USE THE EXACT HEADER BELOW INCLUDING BOTH CHINESE AND ENGLISH AND THE EXACT SPACING)
{json.dumps(unmapped_headers, ensure_ascii=False)}
ENSURE ALL STANDARD FIELDS ARE MAPPED TO THE EXACT COLUMN HEADER INCLUDING BOTH CHINESE AND ENGLISH AND THE EXACT SPACING.
Return only a JSON mapping in this format WITHOUT any explanations:
```json
{{
"standard_field_1": "column_header_1",
"standard_field_2": "column_header_2",
...
}}
```
Common mistakes to note:
Do not force map 名称(英文) to 单价
"""
messages = [{"role": "user", "content": base_prompt}]
client = OpenAI(
base_url=base_url,
api_key=HF_API_KEY,
)
# Add retry logic for AI mapping
max_retries = 3
for attempt in range(max_retries):
try:
print(f"🔄 Sending prompt to LLM (attempt {attempt + 1} of {max_retries})")
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=0.1,
)
raw_mapping = response.choices[0].message.content
think_text = re.findall(r"<think>(.*?)</think>", response.choices[0].message.content, flags=re.DOTALL)
if think_text:
print(f"🧠 Thought Process: {think_text}")
raw_mapping = re.sub(r"<think>.*?</think>\s*", "", raw_mapping, flags=re.DOTALL) # Remove think
# Remove any backticks or json tags
raw_mapping = re.sub(r"```json|```", "", raw_mapping)
# Parse the AI mapping and merge with hardcoded mapping
ai_mapping = json.loads(raw_mapping.strip())
standard_field_mapping.update(ai_mapping)
# Check if all standard fields are mapped
still_unmapped = [field for field in target_fields if field not in standard_field_mapping]
if still_unmapped:
print(f"⚠️ Some standard fields are still unmapped: {still_unmapped}")
if attempt < max_retries - 1:
# Add feedback to the prompt for the next attempt
messages.append({
"role": "assistant",
"content": response.choices[0].message.content
})
messages.append({
"role": "user",
"content": f"The following standard fields are still unmapped: {still_unmapped}. Please try to map these fields using the available headers: {unmapped_headers}"
})
continue
else:
print(f"✅ Successfully mapped all fields using AI")
print("\n📊 AI Mapping Results:")
print("-------------------")
for std_field, mapped_header in ai_mapping.items():
print(f"{std_field} -> {mapped_header}")
print("-------------------")
break
except Exception as e:
error_msg = f"Error in AI mapping attempt {attempt + 1}: {e}"
print(f"❌ {error_msg}")
if attempt < max_retries - 1:
messages.append({
"role": "assistant",
"content": response.choices[0].message.content
})
messages.append({
"role": "user",
"content": f"Your response had the following error: {error_msg}. Please fix your mapping and try again."
})
else:
print(f"⚠️ All AI mapping attempts failed, proceeding with partial mapping")
# After all mapping is done, print the final mapping and unmapped columns
print("\n📊 Final Field Mapping:")
print("-" * 50)
# Print all standard fields, showing mapping if exists or blank if not
for field in target_fields:
mapped_header = standard_field_mapping.get(field, "")
print(f"{field} -> {mapped_header}")
print("-" * 50)
# Check for unmapped standard fields
unmapped_standard = [field for field in target_fields if field not in standard_field_mapping]
if unmapped_standard:
print("\n⚠️ Unmapped Standard Fields:")
print("-" * 50)
for field in unmapped_standard:
print(f"- {field}")
print("-" * 50)
# Check for unmapped extracted headers
mapped_headers = set(standard_field_mapping.values())
unmapped_headers = [header for header in extracted_headers if header not in mapped_headers]
if unmapped_headers:
print("\n⚠️ Unmapped Extracted Headers:")
print("-" * 50)
for header in unmapped_headers:
print(f"- {header}")
print("-" * 50)
# Function to separate Chinese and English text
def separate_chinese_english(text):
if not text or not isinstance(text, str):
return "", ""
# Find all Chinese character positions
chinese_positions = []
for i, char in enumerate(text):
if '\u4e00' <= char <= '\u9fff':
chinese_positions.append(i)
if not chinese_positions:
# No Chinese characters, return empty Chinese and full text as English
return "", text.strip()
# Find the last Chinese character position
last_chinese_pos = chinese_positions[-1]
# Look for the best split point that preserves brackets and punctuation
split_pos = last_chinese_pos + 1
# Check if there are brackets or parentheses that should be kept together
# Look ahead to see if there are closing brackets that belong to the Chinese part
remaining_text = text[split_pos:]
# If the remaining text starts with closing brackets/parentheses, include them in the Chinese part
# This handles both Chinese brackets () and English brackets () that belong to Chinese text
if remaining_text:
# Check for closing brackets that should stay with Chinese
# Use raw string to avoid escape sequence warning
closing_brackets = ')】」』》〉""''()]'
if remaining_text[0] in closing_brackets:
# Find how many closing brackets we have
bracket_count = 0
for char in remaining_text:
if char in closing_brackets:
bracket_count += 1
else:
break
split_pos += bracket_count
# Everything up to the split point is Chinese
chinese_part = text[:split_pos].strip()
# Everything after the split point is English
english_part = text[split_pos:].strip()
# Clean up the parts
# Remove any trailing Chinese punctuation from English part if it doesn't make sense
if english_part:
# If English part starts with Chinese punctuation that doesn't belong, move it to Chinese
chinese_punct_start = re.match(r'^[、,。;:!?]+', english_part)
if chinese_punct_start:
chinese_part += chinese_punct_start.group()
english_part = english_part[len(chinese_punct_start.group()):].strip()
# If English part doesn't actually contain English letters, treat it as empty
if not re.search(r'[a-zA-Z]', english_part):
english_part = ""
return chinese_part, english_part
# Process the data based on the final mapping
transformed_data = []
for row in price_list:
new_row = {field: "" for field in target_fields} # Initialize with empty strings
other_fields = {}
# Step 1: Handle name fields first - look for any field with "名称" or "name"
for header, value in row.items():
# Skip if header is None
if header is None:
continue
# Clean the header for comparison
cleaned_header = re.sub(r'\s+', ' ', str(header)).strip()
header_lower = cleaned_header.lower()
if ("名称" in header_lower or "name" in header_lower) and value:
# If field contains both Chinese and English, separate them
if re.search(r'[\u4e00-\u9fff]', str(value)) and re.search(r'[a-zA-Z]', str(value)):
chinese, english = separate_chinese_english(str(value))
if chinese:
new_row["名称"] = chinese
if english:
new_row["名称(英文)"] = english
# print(f"Separated: '{value}' → Chinese: '{chinese}', English: '{english}'")
else:
# Just set the name directly
new_row["名称"] = str(value)
break # Stop after finding first name field
# Step 2: Fill in all other fields using standard mapping
for header, value in row.items():
# Skip if header is None
if header is None:
continue
# Skip empty values
if not value:
continue
# Clean the header for comparison
cleaned_header = re.sub(r'\s+', ' ', str(header)).strip()
# Check if this maps to a standard field using fuzzy matching
matched_field = None
best_match_score = 0
for std_field, mapped_header in standard_field_mapping.items():
# Skip if mapped_header is None
if mapped_header is None:
continue
# Use fuzzy matching for more flexible comparison
score = fuzz.ratio(cleaned_header.lower().strip(), mapped_header.lower().strip())
if score > best_match_score and score >= 80: # High threshold for data processing
best_match_score = score
matched_field = std_field
# If we found a mapping, use it (but don't overwrite name fields)
if matched_field:
if matched_field not in ["名称", "名称(英文)"] or not new_row[matched_field]:
new_row[matched_field] = str(value)
# If no mapping found, add to other_fields
else:
# Skip name fields we already processed
header_lower = cleaned_header.lower()
if not ("名称" in header_lower or "name" in header_lower):
other_fields[header] = str(value)
# Add remaining fields to "其他"
if other_fields:
new_row["其他"] = other_fields
else:
new_row["其他"] = {}
# Convert field names for validation
if "名称(英文)" in new_row:
new_row["名称(英文)"] = new_row.pop("名称(英文)")
transformed_data.append(new_row)
# Save to file if requested
if save_json and transformed_data:
# Handle edge cases before saving
transformed_data = handle_edge_cases(transformed_data)
with open(json_name, "w", encoding="utf-8") as f:
json.dump(transformed_data, f, ensure_ascii=False, indent=4)
print(f"✅ Saved to {json_name}")
# Handle edge cases (including duplicate column merging) before returning
transformed_data = handle_edge_cases(transformed_data)
return transformed_data
def json_to_excel(contract_summary, json_data, excel_path):
"""Converts extracted JSON tables to an Excel file."""
# Correctly parse the JSON string
contract_summary_json = json.loads(json.loads(contract_summary))
contract_summary_df = pd.DataFrame([contract_summary_json])
# Ensure json_data is a dictionary
if isinstance(json_data, str):
json_data = json.loads(json_data)
long_tables = [pd.DataFrame(table) for key, table in json_data.items() if "long_table" in key and "summary" not in key]
long_table = long_tables[-1] if long_tables else pd.DataFrame()
with pd.ExcelWriter(excel_path) as writer:
contract_summary_df.to_excel(writer, sheet_name="Contract Summary", index=False)
long_table.to_excel(writer, sheet_name="Price List", index=False)
#--- Handle Edge Cases ------------------------------
def handle_weight_conversion_edge_case(transformed_data):
"""
Handles the edge case where converted weight is in '其他' field.
If found, replaces quantity and unit with the converted weight values.
Extracts unit from the bracket in the column header.
"""
for row in transformed_data:
if "其他" not in row or not isinstance(row["其他"], dict):
continue
other_fields = row["其他"]
# Look for weight conversion column with various possible names
weight_key = None
weight_patterns = [
r"换算重量(吨)",
r"converted weight(t)",
r"换算重量",
r"converted weight",
r"重量换算",
r"weight conversion"
]
for key in other_fields:
# Check if any pattern is contained within the key
if any(re.search(pattern, key, re.IGNORECASE) for pattern in weight_patterns):
weight_key = key
break
if weight_key and other_fields[weight_key]:
try:
# Try to convert to float to ensure it's a valid number
weight_value = float(other_fields[weight_key])
# Only replace if the weight value is valid
if weight_value > 0:
# Store original values in case we need to revert
original_quantity = row.get("数量", "")
original_unit = row.get("单位", "")
# Extract unit from the bracket in the column header
unit = "吨" # default unit
bracket_match = re.search(r'[((]([^))]+)[))]', weight_key)
if bracket_match:
unit = bracket_match.group(1).strip()
# Clean up the unit (remove any extra text)
unit = re.sub(r'[^a-zA-Z\u4e00-\u9fff]', '', unit)
# Replace with converted weight
row["数量"] = str(weight_value)
row["单位"] = unit
# Log the conversion
print(f"Converted weight: {weight_value}{unit} (original: {original_quantity} {original_unit})")
# Remove the weight field from other_fields
del other_fields[weight_key]
except (ValueError, TypeError):
# If conversion fails, log and skip
print(f"Warning: Invalid weight value '{other_fields[weight_key]}' in row")
continue
return transformed_data
def handle_edge_cases(transformed_data):
"""
Main function to handle all edge cases in the transformed data.
Currently handles:
1. Weight conversion from '其他' field
2. Duplicate column merging
"""
# Handle weight conversion edge case
transformed_data = handle_weight_conversion_edge_case(transformed_data)
# Handle duplicate column merging
transformed_data = merge_duplicate_columns(transformed_data)
return transformed_data
def merge_duplicate_columns(transformed_data):
"""
Removes duplicate columns that were created due to column spanning in headers.
Simply deletes columns with names ending in _2, _3, etc.
"""
if not transformed_data:
return transformed_data
# Find all duplicate columns (ending with _number)
duplicate_columns = set()
for row in transformed_data:
for column in row.keys():
# Check if this is a duplicate column (ends with _number)
if re.match(r'^.+_\d+$', column):
duplicate_columns.add(column)
# Remove all duplicate columns from all rows
if duplicate_columns:
print(f"🗑️ Removing duplicate columns: {sorted(duplicate_columns)}")
for row in transformed_data:
for dup_col in duplicate_columns:
if dup_col in row:
del row[dup_col]
return transformed_data
#--- Extract PO ------------------------------
def extract_po(docx_path):
"""Processes a single .docx file, extracts tables, formats with OpenAI, and returns combined JSON data."""
if not os.path.exists(docx_path) or not docx_path.endswith(".docx"):
raise ValueError(f"Invalid file: {docx_path}")
# Read the .docx file as bytes
with open(docx_path, "rb") as f:
docx_bytes = BytesIO(f.read())
try:
# Step 1: Extract XML content from DOCX
print("Extracting Docs data to XML...")
xml_filename = os.path.splitext(os.path.basename(docx_path))[0] + "_document.xml"
try:
xml_file = extract_docx_as_xml(docx_bytes, save_xml=False, xml_filename=xml_filename)
get_namespace(ET.fromstring(xml_file))
except (zipfile.BadZipFile, KeyError):
raise ValueError(f"Invalid file: {docx_path}")
# Step 2: Extract tables from DOCX and save JSON
print("Extracting XML data to JSON...")
json_filename = os.path.splitext(os.path.basename(docx_path))[0] + "_extracted_data.json"
extracted_data = xml_to_json(xml_file, save_json=False, json_filename=json_filename)
# Find and rename the price list table before contract summary processing
print("Identifying Price List table...")
extracted_data_dict = json.loads(extracted_data)
price_list_table, price_list_key = find_price_list_table(extracted_data_dict)
# Add the combined price list table to the extracted data
if price_list_table:
# Remove only the specific long_table that was used to create the price list
if price_list_key:
del extracted_data_dict[price_list_key]
# Add the combined price list table
extracted_data_dict["price_list"] = price_list_table
# Update the extracted_data string with proper formatting
extracted_data = json.dumps(extracted_data_dict, ensure_ascii=False, indent=4)
else:
print("⚠️ No suitable price list table found!")
extracted_data_dict["price_list"] = []
extracted_data = json.dumps(extracted_data_dict, ensure_ascii=False, indent=4)
# print(f"✅ Extracted Data: {extracted_data}")
# Create a copy of the data with only first row of price list for contract summary
contract_summary_dict = json.loads(extracted_data)
if contract_summary_dict.get("price_list"):
contract_summary_dict["price_list"] = [contract_summary_dict["price_list"][0]] if contract_summary_dict["price_list"] else []
contract_summary_data = json.dumps(contract_summary_dict, ensure_ascii=False, indent=4)
print(f"✅ Contract Summary Data: {contract_summary_data}")
# Step 3: Process JSON with OpenAI to get structured output
print("Processing Contract Summary data with AI...")
contract_summary_filename = os.path.splitext(os.path.basename(docx_path))[0] + "_contract_summary.json"
contract_summary = extract_contract_summary(contract_summary_data, save_json=False, json_filename=contract_summary_filename)
# Process the price list
print("Processing Price List data with AI...")
price_list_filename = os.path.join(os.path.dirname(docx_path), os.path.splitext(os.path.basename(docx_path))[0] + "_price_list.json")
price_list = extract_price_list(price_list_table, save_json=False, json_name=price_list_filename, fuzzy=True)
# Step 4: Combine contract summary and long table data into a single JSON object
print("Combining AI Generated JSON with Extracted Data...")
combined_data = {
"contract_summary": json.loads(json.loads(contract_summary)),
"price_list": price_list
}
return combined_data
finally:
# Ensure BytesIO is properly closed
if 'docx_bytes' in locals():
docx_bytes.close()
# Example Usage
# print(extract_po("test-contracts\GN-SMB268202501-042WJ SMB268波纹管采购合同-东营顺航.docx"))
# print(extract_po(r"UAT Contracts\20250703\GN-WAPJS202405-297HG 1200R20轮胎采购合同-威海君乐-法务审批0515.docx"))
# print(extract_price_list([{'序号 No.': '1', '名称 Name': 'PE波纹管(双壁波纹管) PE corrugated pipe (double wall corrugated pipe)', '规格 Specification': '内径600mm,6米/根,SN8 Inner diameter 600mm, 6 meters per piece, SN8', '单位 Unit': '米m', '数量 Quantity': '180', '单价(元) Unit Price (CNY)': '106.00', '总额(元) Total Amount (CNY)': '1080.00', '几郎单价(元) Unit Price (GNF)': '16.21', '几郎总额(元) Total Amount (GNF)': '22118.38', '品牌 Brand': '鹏洲PZ', '计划来源 Planned Source': 'SMB268-GNHY-0021-WJ-20250108'}]))
# Gradio Interface ------------------------------
import gradio as gr
from gradio.themes.base import Base
interface = gr.Interface(
fn=extract_po,
title="PO Extractor 买卖合同数据提取",
inputs=gr.File(label="买卖合同 (.docx)"),
outputs=gr.Json(label="提取结果"),
flagging_mode="never",
theme=Base()
)
interface.launch(show_error=True)
|