Spaces:
Sleeping
Sleeping
""" | |
Utilities for analyzing and understanding questions. | |
""" | |
import re | |
import json | |
import os | |
from typing import Dict, Any, List, Optional, Tuple, Set | |
class QuestionAnalyzer: | |
""" | |
Class for analyzing and understanding questions. | |
""" | |
def __init__(self, resource_dir: str, metadata_path: Optional[str] = None): | |
""" | |
Initialize the question analyzer. | |
Args: | |
resource_dir: Directory containing resource files | |
metadata_path: Path to the metadata file (optional) | |
""" | |
self.resource_dir = resource_dir | |
self.metadata_path = metadata_path or os.path.join(resource_dir, 'metadata.jsonl') | |
self.metadata = self._load_metadata() | |
def _load_metadata(self) -> Dict[str, Dict[str, Any]]: | |
""" | |
Load metadata from the metadata file. | |
Returns: | |
Dictionary mapping task IDs to metadata | |
""" | |
metadata = {} | |
if os.path.exists(self.metadata_path): | |
try: | |
with open(self.metadata_path, 'r', encoding='utf-8') as f: | |
for line in f: | |
entry = json.loads(line.strip()) | |
task_id = entry.get('task_id') | |
if task_id: | |
metadata[task_id] = entry | |
except Exception as e: | |
print(f"Error loading metadata: {e}") | |
return metadata | |
def extract_file_mention(self, question: str) -> Optional[str]: | |
""" | |
Extract mentioned file name from the question. | |
Args: | |
question: The question to analyze | |
Returns: | |
Mentioned file name, or None if no file is mentioned | |
""" | |
# Look for "attached file" or "attached spreadsheet" patterns | |
attached_pattern = r'attached (?:file|spreadsheet|document|image|picture|pdf|excel|csv|text file|zip|archive) (?:named |called |")?([\w\.-]+)' | |
match = re.search(attached_pattern, question, re.IGNORECASE) | |
if match: | |
return match.group(1) | |
# Look for file extensions | |
extensions = [ | |
'.xlsx', '.xls', '.csv', '.txt', '.pdf', '.jpg', '.jpeg', | |
'.png', '.docx', '.pptx', '.json', '.jsonld', '.zip', '.pdb', '.py' | |
] | |
for ext in extensions: | |
pattern = r'(\w+(?:-\w+)*' + re.escape(ext) + r')' | |
match = re.search(pattern, question, re.IGNORECASE) | |
if match: | |
return match.group(1) | |
return None | |
def find_relevant_file(self, question: str, task_id: Optional[str] = None) -> Optional[str]: | |
""" | |
Find the relevant file for a question. | |
Args: | |
question: The question to analyze | |
task_id: The task ID (optional) | |
Returns: | |
Path to the relevant file, or None if no file is found | |
""" | |
# Check if task_id is in metadata and has a file_name | |
if task_id and task_id in self.metadata: | |
file_name = self.metadata[task_id].get('file_name') | |
if file_name and file_name.strip(): # Make sure file_name is not empty | |
file_path = os.path.join(self.resource_dir, file_name) | |
if os.path.exists(file_path): | |
print(f"Found file in metadata for task_id {task_id}: {file_path}") | |
return file_path | |
# Try to find task_id in all metadata entries by matching the question | |
if not task_id: | |
for entry_id, entry in self.metadata.items(): | |
if entry.get('Question') and entry.get('Question') == question: | |
file_name = entry.get('file_name') | |
if file_name and file_name.strip(): | |
file_path = os.path.join(self.resource_dir, file_name) | |
if os.path.exists(file_path): | |
print(f"Found file in metadata by matching question: {file_path}") | |
return file_path | |
# Extract file mention from question | |
file_mention = self.extract_file_mention(question) | |
if file_mention: | |
# Check if the mentioned file exists | |
file_path = os.path.join(self.resource_dir, file_mention) | |
if os.path.exists(file_path): | |
print(f"Found file by direct mention: {file_path}") | |
return file_path | |
# Check if there's a file with a similar name | |
for file_name in os.listdir(self.resource_dir): | |
if file_mention.lower() in file_name.lower(): | |
file_path = os.path.join(self.resource_dir, file_name) | |
print(f"Found file by partial name match: {file_path}") | |
return file_path | |
# Look for UUID pattern in the question which might be a file name without extension | |
uuid_pattern = r'([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})' | |
uuid_match = re.search(uuid_pattern, question, re.IGNORECASE) | |
if uuid_match: | |
uuid = uuid_match.group(1) | |
for file_name in os.listdir(self.resource_dir): | |
if uuid in file_name: | |
file_path = os.path.join(self.resource_dir, file_name) | |
print(f"Found file by UUID match: {file_path}") | |
return file_path | |
# If no file is found, try to find a file mentioned in the metadata | |
if task_id and task_id in self.metadata: | |
# Extract keywords from the question | |
keywords = self._extract_keywords(question) | |
# Check all files in the resource directory | |
best_match = None | |
best_score = 0 | |
for file_name in os.listdir(self.resource_dir): | |
# Skip metadata file | |
if file_name == 'metadata.jsonl': | |
continue | |
# Calculate score based on keyword matches | |
score = 0 | |
for keyword in keywords: | |
if keyword.lower() in file_name.lower(): | |
score += 1 | |
if score > best_score: | |
best_score = score | |
best_match = file_name | |
if best_match: | |
file_path = os.path.join(self.resource_dir, best_match) | |
print(f"Found file by keyword matching: {file_path}") | |
return file_path | |
# If still no match, check the content of metadata.jsonl for clues | |
try: | |
with open(self.metadata_path, 'r', encoding='utf-8') as f: | |
for line in f: | |
entry = json.loads(line.strip()) | |
if 'Question' in entry and entry['Question'] and 'file_name' in entry and entry['file_name']: | |
# Compare with current question | |
if self._questions_are_similar(question, entry['Question']): | |
file_name = entry['file_name'] | |
file_path = os.path.join(self.resource_dir, file_name) | |
if os.path.exists(file_path): | |
print(f"Found file by similar question in metadata: {file_path}") | |
return file_path | |
except Exception as e: | |
print(f"Error searching metadata for similar questions: {e}") | |
return None | |
def _questions_are_similar(self, q1: str, q2: str) -> bool: | |
""" | |
Check if two questions are similar. | |
Args: | |
q1: First question | |
q2: Second question | |
Returns: | |
True if the questions are similar, False otherwise | |
""" | |
# Convert to lowercase and remove punctuation | |
q1 = re.sub(r'[^\w\s]', '', q1.lower()) | |
q2 = re.sub(r'[^\w\s]', '', q2.lower()) | |
# Split into words | |
words1 = set(q1.split()) | |
words2 = set(q2.split()) | |
# Calculate Jaccard similarity | |
intersection = len(words1.intersection(words2)) | |
union = len(words1.union(words2)) | |
if union == 0: | |
return False | |
similarity = intersection / union | |
# Return True if similarity is above threshold | |
return similarity > 0.5 | |
def _extract_keywords(self, text: str) -> Set[str]: | |
""" | |
Extract keywords from text. | |
Args: | |
text: The text to analyze | |
Returns: | |
Set of keywords | |
""" | |
# Remove common stop words | |
stop_words = { | |
'a', 'an', 'the', 'and', 'or', 'but', 'if', 'then', 'else', 'when', | |
'at', 'from', 'by', 'for', 'with', 'about', 'against', 'between', | |
'into', 'through', 'during', 'before', 'after', 'above', 'below', | |
'to', 'of', 'in', 'on', 'is', 'are', 'was', 'were', 'be', 'been', | |
'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did', | |
'doing', 'would', 'should', 'could', 'might', 'will', 'shall', | |
'can', 'may', 'must', 'ought' | |
} | |
# Extract words | |
words = re.findall(r'\b\w+\b', text.lower()) | |
# Filter out stop words and short words | |
keywords = {word for word in words if word not in stop_words and len(word) > 2} | |
return keywords | |
def analyze_question(self, question: str, task_id: Optional[str] = None) -> Dict[str, Any]: | |
""" | |
Analyze a question to understand what it's asking. | |
Args: | |
question: The question to analyze | |
task_id: The task ID (optional) | |
Returns: | |
Dictionary containing analysis results | |
""" | |
result = { | |
'question': question, | |
'task_id': task_id, | |
'file_path': None, | |
'keywords': list(self._extract_keywords(question)), | |
'expected_answer': None, | |
} | |
# Try to extract task_id from the question if not provided | |
if not task_id: | |
task_id_match = re.search(r'task_id[: ]+([\w\-]+)', question, re.IGNORECASE) | |
if task_id_match: | |
result['task_id'] = task_id_match.group(1) | |
task_id = result['task_id'] | |
# Find relevant file | |
file_path = self.find_relevant_file(question, task_id) | |
if file_path: | |
result['file_path'] = file_path | |
# Get expected answer if available | |
if task_id and task_id in self.metadata: | |
# Check multiple possible fields for the answer | |
for answer_field in ['answer', 'Final answer', 'expected_answer']: | |
if answer_field in self.metadata[task_id]: | |
result['expected_answer'] = self.metadata[task_id].get(answer_field) | |
break | |
# If we still don't have an expected answer, search the metadata file again | |
if not result['expected_answer'] and os.path.exists(self.metadata_path): | |
try: | |
with open(self.metadata_path, 'r', encoding='utf-8') as f: | |
for line in f: | |
try: | |
entry = json.loads(line.strip()) | |
if entry.get('task_id') == task_id: | |
for answer_field in ['answer', 'Final answer', 'expected_answer']: | |
if answer_field in entry: | |
result['expected_answer'] = entry[answer_field] | |
break | |
if result['expected_answer']: | |
break | |
# Also check if the task_id is in the question field | |
if task_id and 'question' in entry and task_id in entry['question']: | |
for answer_field in ['answer', 'Final answer', 'expected_answer']: | |
if answer_field in entry: | |
result['expected_answer'] = entry[answer_field] | |
break | |
if result['expected_answer']: | |
break | |
except json.JSONDecodeError: | |
continue | |
except Exception as e: | |
print(f"Error searching metadata for expected answer: {e}") | |
return result | |
def find_file_by_task_id(self, task_id: str) -> Optional[str]: | |
""" | |
Find a file path by task_id in metadata. | |
Args: | |
task_id: The task ID | |
Returns: | |
File path if found, None otherwise | |
""" | |
if not task_id: | |
return None | |
# Check if we have this task_id in our metadata | |
if task_id in self.metadata: | |
file_name = self.metadata[task_id].get('file_name') | |
if file_name: | |
file_path = os.path.join(self.resource_dir, file_name) | |
if os.path.exists(file_path): | |
print(f"Found file in metadata for task_id {task_id}: {file_path}") | |
return file_path | |
# Search through metadata file again to find the task_id | |
try: | |
with open(self.metadata_path, 'r', encoding='utf-8') as f: | |
for line in f: | |
try: | |
entry = json.loads(line.strip()) | |
if entry.get('task_id') == task_id and 'file_name' in entry: | |
file_name = entry['file_name'] | |
file_path = os.path.join(self.resource_dir, file_name) | |
if os.path.exists(file_path): | |
print(f"Found file in metadata for task_id {task_id}: {file_path}") | |
return file_path | |
# If the file doesn't exist with the exact path, look for similar files | |
for existing_file in os.listdir(self.resource_dir): | |
if task_id in existing_file: | |
file_path = os.path.join(self.resource_dir, existing_file) | |
print(f"Found file matching task_id {task_id}: {file_path}") | |
return file_path | |
except json.JSONDecodeError: | |
continue | |
except Exception as e: | |
print(f"Error searching metadata for file by task_id: {e}") | |
return None | |