|
|
|
|
|
|
|
|
import re
|
|
|
from typing import List, Dict, Optional, Any
|
|
|
from datetime import datetime
|
|
|
from difflib import SequenceMatcher
|
|
|
|
|
|
def extract_dates(text: str) -> List[str]:
|
|
|
"""
|
|
|
Robust date extraction that handles:
|
|
|
- Numeric formats: DD/MM/YYYY, DD-MM-YYYY, DD.MM.YYYY
|
|
|
- Text month formats: 22 Mar 18, March 22, 2018, 22-Mar-2018
|
|
|
- OCR noise like pipes (|) instead of slashes
|
|
|
Validates using datetime to ensure semantic correctness.
|
|
|
"""
|
|
|
if not text: return []
|
|
|
|
|
|
|
|
|
MONTH_MAP = {
|
|
|
'jan': 1, 'january': 1,
|
|
|
'feb': 2, 'february': 2,
|
|
|
'mar': 3, 'march': 3,
|
|
|
'apr': 4, 'april': 4,
|
|
|
'may': 5,
|
|
|
'jun': 6, 'june': 6,
|
|
|
'jul': 7, 'july': 7,
|
|
|
'aug': 8, 'august': 8,
|
|
|
'sep': 9, 'sept': 9, 'september': 9,
|
|
|
'oct': 10, 'october': 10,
|
|
|
'nov': 11, 'november': 11,
|
|
|
'dec': 12, 'december': 12
|
|
|
}
|
|
|
|
|
|
valid_dates = []
|
|
|
|
|
|
|
|
|
|
|
|
numeric_pattern = r'\b(\d{1,2})[\s/|.-](\d{1,2})[\s/|.-](\d{2,4})\b'
|
|
|
for d, m, y in re.findall(numeric_pattern, text):
|
|
|
try:
|
|
|
year = int(y)
|
|
|
if year < 100:
|
|
|
year = 2000 + year if year < 50 else 1900 + year
|
|
|
dt = datetime(year, int(m), int(d))
|
|
|
valid_dates.append(dt.strftime("%d/%m/%Y"))
|
|
|
except ValueError:
|
|
|
continue
|
|
|
|
|
|
|
|
|
text_month_pattern1 = r'\b(\d{1,2})[\s/.-]?([A-Za-z]{3,9})[\s/.-]?(\d{2,4})\b'
|
|
|
for d, m, y in re.findall(text_month_pattern1, text, re.IGNORECASE):
|
|
|
month_num = MONTH_MAP.get(m.lower())
|
|
|
if month_num:
|
|
|
try:
|
|
|
year = int(y)
|
|
|
if year < 100:
|
|
|
year = 2000 + year if year < 50 else 1900 + year
|
|
|
dt = datetime(year, month_num, int(d))
|
|
|
valid_dates.append(dt.strftime("%d/%m/%Y"))
|
|
|
except ValueError:
|
|
|
continue
|
|
|
|
|
|
|
|
|
text_month_pattern2 = r'\b([A-Za-z]{3,9})[\s.-]?(\d{1,2})[,\s.-]+(\d{2,4})\b'
|
|
|
for m, d, y in re.findall(text_month_pattern2, text, re.IGNORECASE):
|
|
|
month_num = MONTH_MAP.get(m.lower())
|
|
|
if month_num:
|
|
|
try:
|
|
|
year = int(y)
|
|
|
if year < 100:
|
|
|
year = 2000 + year if year < 50 else 1900 + year
|
|
|
dt = datetime(year, month_num, int(d))
|
|
|
valid_dates.append(dt.strftime("%d/%m/%Y"))
|
|
|
except ValueError:
|
|
|
continue
|
|
|
|
|
|
|
|
|
iso_pattern = r'\b(\d{4})[-/](\d{1,2})[-/](\d{1,2})\b'
|
|
|
for y, m, d in re.findall(iso_pattern, text):
|
|
|
try:
|
|
|
dt = datetime(int(y), int(m), int(d))
|
|
|
valid_dates.append(dt.strftime("%d/%m/%Y"))
|
|
|
except ValueError:
|
|
|
continue
|
|
|
|
|
|
return list(dict.fromkeys(valid_dates))
|
|
|
|
|
|
def extract_amounts(text: str) -> List[float]:
|
|
|
if not text: return []
|
|
|
|
|
|
pattern = r'\b\d{1,3}(?:,\d{3})*\.\d{2}\b'
|
|
|
amounts_strings = re.findall(pattern, text)
|
|
|
|
|
|
amounts = []
|
|
|
for amt_str in amounts_strings:
|
|
|
amt_cleaned = amt_str.replace(',', '')
|
|
|
try:
|
|
|
amounts.append(float(amt_cleaned))
|
|
|
except ValueError:
|
|
|
continue
|
|
|
return amounts
|
|
|
|
|
|
def extract_total(text: str) -> Optional[float]:
|
|
|
"""
|
|
|
Robust total extraction using keyword confidence + Footer Search.
|
|
|
"""
|
|
|
if not text: return None
|
|
|
|
|
|
|
|
|
|
|
|
pattern = r'(?:TOTAL|AMOUNT DUE|GRAND TOTAL|BALANCE|PAYABLE)[\w\s]*[:$]?\s*([\d,]+\.\d{2})'
|
|
|
matches = re.findall(pattern, text, re.IGNORECASE)
|
|
|
|
|
|
if matches:
|
|
|
|
|
|
try:
|
|
|
return float(matches[-1].replace(',', ''))
|
|
|
except ValueError:
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
lines = text.split('\n')
|
|
|
if not lines: return None
|
|
|
|
|
|
|
|
|
footer_lines = lines[-int(len(lines)*0.3):]
|
|
|
|
|
|
candidates = []
|
|
|
for line in footer_lines:
|
|
|
line_amounts = extract_amounts(line)
|
|
|
for amt in line_amounts:
|
|
|
|
|
|
if 2000 <= amt <= 2030 and float(amt).is_integer():
|
|
|
continue
|
|
|
candidates.append(amt)
|
|
|
|
|
|
if candidates:
|
|
|
return max(candidates)
|
|
|
|
|
|
return None
|
|
|
|
|
|
def extract_vendor(text: str) -> Optional[str]:
|
|
|
if not text: return None
|
|
|
lines = text.strip().split('\n')
|
|
|
company_suffixes = ['SDN BHD', 'INC', 'LTD', 'LLC', 'PLC', 'CORP', 'PTY', 'PVT', 'LIMITED']
|
|
|
|
|
|
for line in lines[:10]:
|
|
|
line_upper = line.upper()
|
|
|
if any(suffix in line_upper for suffix in company_suffixes):
|
|
|
return line.strip()
|
|
|
|
|
|
|
|
|
for line in lines[:5]:
|
|
|
if len(line.strip()) > 3 and not re.search(r'\d{2}/\d{2}', line):
|
|
|
return line.strip()
|
|
|
return None
|
|
|
|
|
|
def extract_invoice_number(text: str) -> Optional[str]:
|
|
|
if not text: return None
|
|
|
|
|
|
|
|
|
FORBIDDEN_WORDS = {
|
|
|
'INVOICE', 'TAX', 'RECEIPT', 'BILL', 'NUMBER', 'NO', 'DATE',
|
|
|
'ORIGINAL', 'COPY', 'GST', 'REG', 'MEMBER', 'SLIP', 'TEL', 'FAX'
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
TOXIC_LINE_INDICATORS = ['GST', 'REG', 'SSM', 'TIN', 'PHONE', 'TEL', 'FAX', 'UBL', 'UEN']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
keyword_pattern = r'(?i)(?:TAX\s*)?(?:INVOICE|INV|BILL|RECEIPT|SLIP)\s*(?:NO|NUMBER|#|NUM)\s*[:\.]?\s*([A-Z0-9\-/]+)'
|
|
|
matches = re.findall(keyword_pattern, text)
|
|
|
|
|
|
for match in matches:
|
|
|
clean_match = match.strip()
|
|
|
|
|
|
if len(clean_match) >= 3 and clean_match.upper() not in FORBIDDEN_WORDS:
|
|
|
return clean_match
|
|
|
|
|
|
|
|
|
|
|
|
lines = text.split('\n')
|
|
|
for line in lines[:25]:
|
|
|
line_upper = line.upper()
|
|
|
|
|
|
|
|
|
|
|
|
if any(bad in line_upper for bad in TOXIC_LINE_INDICATORS) and "INVOICE" not in line_upper:
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
if any(k in line_upper for k in ['INVOICE', ' NO', ' #', 'INV', 'SLIP', 'BILL']):
|
|
|
|
|
|
|
|
|
tokens = re.findall(r'\b[A-Z0-9\-/]{3,}\b', line_upper)
|
|
|
|
|
|
for token in tokens:
|
|
|
if token in FORBIDDEN_WORDS:
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
if any(c.isdigit() for c in token):
|
|
|
return token
|
|
|
|
|
|
return None
|
|
|
|
|
|
def extract_bill_to(text: str) -> Optional[Dict[str, str]]:
|
|
|
if not text: return None
|
|
|
|
|
|
|
|
|
match = re.search(r'(?:BILL|BILLED)\s*TO[:\s]+([^\n]+)', text, re.IGNORECASE)
|
|
|
if match:
|
|
|
name = match.group(1).strip()
|
|
|
return {"name": name, "email": None}
|
|
|
return None
|
|
|
|
|
|
def extract_address(text: str, vendor_name: Optional[str] = None) -> Optional[str]:
|
|
|
"""
|
|
|
Generalized Address Extraction using Spatial Heuristics.
|
|
|
Strategy:
|
|
|
1. If Vendor is known, look at the lines immediately FOLLOWING it (Spatial).
|
|
|
2. If Vendor is unknown, look for lines in the top header with 'Address-like' traits
|
|
|
(mix of text + numbers, 3+ words, contains Zip-code-like patterns).
|
|
|
"""
|
|
|
if not text: return None
|
|
|
|
|
|
lines = [line.strip() for line in text.split('\n') if line.strip()]
|
|
|
|
|
|
|
|
|
|
|
|
def is_invalid_line(line):
|
|
|
line_upper = line.upper()
|
|
|
|
|
|
if any(x in line_upper for x in ['TEL', 'FAX', 'PHONE', 'EMAIL', '@', 'WWW.', '.COM', 'HTTP']):
|
|
|
return True
|
|
|
|
|
|
if len(line) < 15 and any(c.isdigit() for c in line) and ('/' in line or '-' in line):
|
|
|
return True
|
|
|
|
|
|
if vendor_name and vendor_name.lower() in line.lower():
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
candidate_lines = []
|
|
|
|
|
|
if vendor_name:
|
|
|
vendor_found = False
|
|
|
|
|
|
for i, line in enumerate(lines[:15]):
|
|
|
if vendor_name.lower() in line.lower() or (len(vendor_name) > 5 and SequenceMatcher(None, vendor_name, line).ratio() > 0.8):
|
|
|
vendor_found = True
|
|
|
|
|
|
|
|
|
for j in range(1, 4):
|
|
|
if i + j < len(lines):
|
|
|
next_line = lines[i + j]
|
|
|
if not is_invalid_line(next_line):
|
|
|
candidate_lines.append(next_line)
|
|
|
else:
|
|
|
|
|
|
break
|
|
|
break
|
|
|
|
|
|
|
|
|
if candidate_lines:
|
|
|
return ", ".join(candidate_lines)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fallback_candidates = []
|
|
|
started_collecting = False
|
|
|
|
|
|
for line in lines[:10]:
|
|
|
if is_invalid_line(line):
|
|
|
|
|
|
|
|
|
if started_collecting:
|
|
|
break
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
has_digits = any(c.isdigit() for c in line)
|
|
|
|
|
|
is_long_enough = len(line) > 10
|
|
|
|
|
|
is_multi_word = line.count(' ') >= 2
|
|
|
|
|
|
|
|
|
|
|
|
is_valid_first_line = has_digits and is_long_enough and is_multi_word
|
|
|
is_valid_continuation = started_collecting and is_long_enough and is_multi_word
|
|
|
|
|
|
if is_valid_first_line or is_valid_continuation:
|
|
|
|
|
|
fallback_candidates.append(line)
|
|
|
started_collecting = True
|
|
|
|
|
|
if len(fallback_candidates) >= 3:
|
|
|
break
|
|
|
|
|
|
if fallback_candidates:
|
|
|
return ", ".join(fallback_candidates)
|
|
|
|
|
|
return None
|
|
|
|
|
|
def extract_line_items(text: str) -> List[Dict[str, Any]]:
|
|
|
return []
|
|
|
|
|
|
def structure_output(text: str) -> Dict[str, Any]:
|
|
|
"""Legacy wrapper for rule-based-only pipeline"""
|
|
|
return {
|
|
|
"receipt_number": extract_invoice_number(text),
|
|
|
"date": extract_dates(text)[0] if extract_dates(text) else None,
|
|
|
"total_amount": extract_total(text),
|
|
|
"vendor": extract_vendor(text),
|
|
|
"raw_text": text
|
|
|
} |