NuExtract-v1.5 / data.py
liamcripwell's picture
skip field syncing if not dict (i.e is array)
2283240
raw
history blame
3.21 kB
import json
def extract_leaves(item, path=None, leaves=None):
"""
Extracts the leaves of a nested dictionary or list.
"""
if leaves is None:
leaves = []
if path is None:
path = []
if isinstance(item, dict):
for key, value in item.items():
extract_leaves(value, path + [key], leaves)
elif isinstance(item, list):
for value in item:
extract_leaves(value, path, leaves)
else:
if item != '':
leaves.append((path, item))
return leaves
def split_document(document, window_size, overlap, tokenizer):
"""
Splits a document into chunks of a specified window size with an overlap.
"""
tokens = tokenizer.tokenize(document)
print(f"\tLength of document: {len(tokens)} tokens")
chunks = []
if len(tokens) > window_size:
for i in range(0, len(tokens), window_size-overlap):
print(f"\t{i} to {i + len(tokens[i:i + window_size])}")
chunk = tokenizer.convert_tokens_to_string(tokens[i:i + window_size])
chunks.append(chunk)
if i + len(tokens[i:i + window_size]) >= len(tokens):
break
else:
chunks.append(document)
print(f"\tSplit into {len(chunks)} chunks")
return chunks
def handle_broken_output(pred, prev):
"""
Handles broken or empty JSON output by returning the previous prediction.
"""
try:
if all([(v in ["", []]) for v in json.loads(pred).values()]):
# if empty json, return previous
pred = prev
except:
# if broken json, return previous
pred = prev
return pred
def clean_json_text(text):
"""
Cleans JSON text by removing leading/trailing whitespace and escaping special characters.
"""
text = text.strip()
text = text.replace("\#", "#").replace("\&", "&")
return text
def sync_empty_fields(dict1, dict2):
"""
Synchronize empty fields between two dictionaries.
Adds empty fields to dict1 based on dict2, or removes them if they don't exist in dict2.
Args:
dict1 (dict): The dictionary to be modified.
dict2 (dict): The reference dictionary with empty fields to be synced.
Returns:
dict: The modified dict1 with synced empty fields.
"""
if not isinstance(dict1, dict) or not isinstance(dict2, dict):
return dict1
# Traverse dict2 to add or remove empty fields in dict1
for key, value in dict2.items():
if isinstance(value, dict): # Handle nested dictionaries
dict1[key] = sync_empty_fields(dict1.get(key, {}), value)
elif value in (None, "", [], {}): # Empty field in dict2
if key not in dict1:
dict1[key] = value # Add empty field to dict1 if not present
else:
if key in dict1 and dict1[key] in (None, "", [], {}):
del dict1[key] # Remove empty field from dict1 if not in dict2
# Optionally, remove any extra fields in dict1 that are not in dict2
keys_to_remove = [key for key in dict1 if key not in dict2]
for key in keys_to_remove:
del dict1[key]
return dict1