Spaces:
Paused
Paused
File size: 2,446 Bytes
3bbba47 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
import json
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
def process_item(item):
# Read the image and convert it to byte format
with open(item["image"], "rb") as img_file:
img_bytes = img_file.read()
record = {
"image": img_bytes,
"conversations": json.dumps(item["conversations"]) # Serialize as JSON string
}
return record
# Read the JSON file
with open('merged_half.json', 'r') as file:
data = json.load(file)
local_path = 'merged_first_half.parquet'
# Get the number of CPU cores in the system
cpu_count = os.cpu_count()
# Process data in batches
batch_size = 100000 # Can be adjusted based on actual needs
num_batches = (len(data) + batch_size - 1) // batch_size
# Local file path
# local_path = 'final_data_4ch.parquet'
# Initialize ParquetWriter
with open(local_path, 'wb') as local_file:
writer = None
for batch_index in range(num_batches):
start_index = batch_index * batch_size
end_index = min((batch_index + 1) * batch_size, len(data))
batch_data = data[start_index:end_index]
# Use ThreadPoolExecutor for parallel processing
records = []
with ThreadPoolExecutor(max_workers=cpu_count) as executor:
future_to_record = {executor.submit(process_item, item): item for item in batch_data}
for future in tqdm(as_completed(future_to_record), total=len(future_to_record),
desc=f"Processing Batch {batch_index + 1}/{num_batches}"):
try:
record = future.result()
records.append(record)
except Exception as exc:
print(f'Generated an exception: {exc}')
# Create a PyArrow table
table = pa.Table.from_pandas(pd.DataFrame(records))
# If it's the first batch, set the writer and schema
if writer is None:
writer = pq.ParquetWriter(local_file, table.schema, version='2.6', use_dictionary=True, compression='snappy')
# Write to the Parquet file in chunks
for i in tqdm(range(0, len(table), 4), desc=f"Writing Batch {batch_index + 1}/{num_batches} to Parquet"):
writer.write_table(table.slice(i, 4))
writer.close()
print("Completed: Batches saved as Parquet files to local directory")
|