gourisankar85's picture
Upload 10 files
1fed7a3 verified
import json
import os
import time
import logging
from pathlib import Path
# Create a list to store logs
logs = []
# Helper function to ensure directory exists
def ensure_directory_exists(filepath):
"""Ensure the directory for a given file path exists."""
directory = os.path.dirname(filepath)
if not os.path.exists(directory):
os.makedirs(directory)
# Helper function for adaptive delay
def adaptive_delay(attempt, max_delay=60):
"""Increase wait time with each retry."""
delay = min(5 * attempt, max_delay) # Max delay of max_delay seconds
logging.info(f"Retrying after {delay} seconds...")
time.sleep(delay)
def load_config(config_file="config.json"):
"""Load configuration from the config file."""
try:
with open(config_file, "r", encoding="utf-8") as f:
config = json.load(f)
config['output_file_extension'] = f"{config['model_name']}_noise_{config['noise_rate']}_passage_{config['passage_num']}_num_queries_{config['num_queries']}"
return config
except Exception as e:
logging.info(f"Error loading config: {e}")
return {}
def update_config(config, model_name=None, noise_rate=None, num_queries=None):
"""
Update the config dictionary with user-provided values.
Args:
config (dict): The configuration dictionary to update.
model_name (str, optional): The model name to update in the config.
noise_rate (float, optional): The noise rate to update in the config.
num_queries (int, optional): The number of queries to update in the config.
Returns:
dict: The updated configuration dictionary.
"""
if model_name:
config['model_name'] = model_name
if noise_rate is not None: # Explicitly check for None to handle 0.0
config['noise_rate'] = float(noise_rate) # Ensure it's a float
if num_queries is not None: # Explicitly check for None to handle 0
config['num_queries'] = int(num_queries) # Ensure it's an integer
config['output_file_extension'] = f"{config['model_name']}_noise_{config['noise_rate']}_passage_{config['passage_num']}_num_queries_{config['num_queries']}"
return config
def load_dataset(file_name):
dataset = []
with open('data/' + file_name, "r", encoding="utf-8") as f:
for line in f:
dataset.append(json.loads(line.strip())) # Load each JSON object per line
logging.info(f"Loaded {len(dataset)} entries from file {file_name}") # Check how many records were loaded
return dataset
def initialize_logging():
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Custom log handler to capture logs and add them to the logs list
class LogHandler(logging.Handler):
def emit(self, record):
log_entry = self.format(record)
logs.append(log_entry)
# Add custom log handler to the logger
log_handler = LogHandler()
log_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
logger.addHandler(log_handler)
def get_logs():
"""Retrieve logs for display."""
return "\n".join(logs[-1000:])
def load_used_data(filepath):
"""Loads existing processed data to avoid redundant evaluations."""
used_data = {}
if Path(filepath).exists():
with open(filepath, encoding='utf-8') as f:
for line in f:
data = json.loads(line)
used_data[data['id']] = data
return used_data
def update_logs_periodically():
while True:
time.sleep(2) # Wait for 2 seconds
yield get_logs()