RAGDemo / src /preprocessing /consolidate.py
derek-thomas's picture
derek-thomas HF staff
Adding code
6f49ee6
raw
history blame contribute delete
No virus
3.11 kB
import json
from pathlib import Path
from time import perf_counter
from typing import Any, Dict
from tqdm.auto import tqdm
def folder_to_json(folder_in: Path, json_path: Path):
"""
Process JSON lines from files in a given folder and write processed data to a new JSON file.
Parameters:
folder_in (Path): Path to the input folder containing the JSON files to process.
json_path (Path): Path to the output JSON file where the processed data will be written.
Example:
folder_to_json(Path("/path/to/input/folder"), Path("/path/to/output.json"))
"""
json_out = [] # Initialize list to hold processed JSON data from all files
process_start = perf_counter()
# Use rglob to get all JSON files and sort them by their full path
all_files = sorted(folder_in.rglob('*wiki*'), key=lambda x: str(x))
# Initialize progress bar with total file count, description, and unit of progress
with tqdm(total=len(all_files), desc='Processing', unit='file') as pbar:
# Iterate through all files in the input folder in order
for file_path in all_files:
# Update progress bar postfix to display current file and directory
pbar.set_postfix_str(f"File: {file_path.name} | Dir: {file_path.parent}", refresh=True)
# Open and read the current file
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
# Load JSON data from each line and process it
article = json.loads(line)
# Add restructured article to the output list
json_out.extend([restructure_articles(article)])
# Update progress bar after processing each file
pbar.update(1)
time_taken_to_process = perf_counter() - process_start
pbar.write(f"Wiki processed in {round(time_taken_to_process, 2)} seconds!")
# Notify that the writing process is starting
pbar.write("Writing file!")
write_start = perf_counter()
# Open the output file and write the processed data as JSON
with open(json_path, "w", encoding='utf-8') as outfile:
json.dump(json_out, outfile)
time_taken_to_write = perf_counter() - write_start
# Notify that the writing process is complete
pbar.write(f"File written in {round(time_taken_to_write, 2)} seconds!")
def restructure_articles(article: Dict[str, Any]) -> Dict[str, Any]:
"""
Restructures the given article into haystack's format, separating content and meta data.
Args:
- article (Dict[str, Any]): The article to restructure.
Returns:
- Dict[str, Any]: The restructured article.
"""
# Extract content and separate meta data
article_out = {
'content': article['text'],
'meta': {k: v for k, v in article.items() if k != 'text'}
}
return article_out
if __name__ == '__main__':
proj_dir = Path(__file__).parents[2]
folder = proj_dir / 'data/raw/output'
file_out = proj_dir / 'data/consolidated/simple_wiki.json'
folder_to_json(folder, file_out)
print('Done!')