Spaces:
Sleeping
Sleeping
""" | |
Data preprocessing for fine-tuning on Iain Morris articles | |
""" | |
import json | |
import re | |
from typing import List, Dict, Tuple | |
import pandas as pd | |
from datasets import Dataset | |
import logging | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class ArticlePreprocessor: | |
def __init__(self): | |
"""Initialize the preprocessor""" | |
self.min_content_length = 500 | |
self.max_content_length = 8000 | |
self.system_prompt = """You are Iain Morris, a veteran telecom journalist with a razor-sharp pen and zero tolerance for industry BS. Your writing style is distinctive for: | |
PROVOCATIVE TITLES & OPENINGS: | |
- Always lead with conflict, failure, or impending doom | |
- Use dramatic, negative framing even for mundane topics | |
- Open with vivid scenarios that immediately establish tension | |
- Frame everything as battles, collisions, or disasters waiting to happen | |
SIGNATURE NEGATIVE ANALOGIES: | |
- Compare industry situations to train wrecks, collisions, explosions | |
- Use visceral, physical metaphors for business problems | |
- Reference pop culture disasters and failures | |
- Turn technical concepts into dramatic, often dark imagery | |
WRITING TECHNIQUE: | |
- Cynical, sarcastic commentary on industry players | |
- Technical expertise delivered with biting wit | |
- Assume readers are intelligent but skeptical | |
- Build articles around conflict narratives | |
- Use parenthetical asides for extra snark | |
- Quote industry figures, then immediately undercut them | |
Write compelling telecom news articles that grab readers by the throat from the first sentence and never let go.""" | |
def load_articles(self, filepath: str) -> List[Dict]: | |
""" | |
Load articles from JSON file | |
Args: | |
filepath: Path to the JSON file containing articles | |
Returns: | |
List of article dictionaries | |
""" | |
try: | |
with open(filepath, 'r', encoding='utf-8') as f: | |
articles = json.load(f) | |
logger.info(f"Loaded {len(articles)} articles from {filepath}") | |
return articles | |
except Exception as e: | |
logger.error(f"Error loading articles: {e}") | |
return [] | |
def clean_content(self, content: str) -> str: | |
""" | |
Clean article content for training | |
Args: | |
content: Raw article content | |
Returns: | |
Cleaned content | |
""" | |
if not content: | |
return "" | |
# Remove URLs | |
content = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', content) | |
# Remove email addresses | |
content = re.sub(r'\S+@\S+', '', content) | |
# Remove excessive whitespace but preserve paragraph breaks | |
content = re.sub(r'[ \t]+', ' ', content) # Multiple spaces/tabs to single space | |
content = re.sub(r'\n\s*\n\s*\n+', '\n\n', content) # Multiple line breaks to double | |
# Enhanced footer/header cleaning for new crawler format | |
content = re.sub(r'Light Reading.*?All rights reserved\.?', '', content, flags=re.IGNORECASE) | |
content = re.sub(r'Copyright.*?Light Reading', '', content, flags=re.IGNORECASE) | |
content = re.sub(r'Copyright.*?Informa.*?TechTarget.*?registered office.*?', '', content, flags=re.IGNORECASE | re.DOTALL) | |
content = re.sub(r'You May Also Like.*?$', '', content, flags=re.IGNORECASE | re.DOTALL) | |
content = re.sub(r'Featured Story.*?$', '', content, flags=re.IGNORECASE | re.DOTALL) | |
content = re.sub(r'Read more about:.*?$', '', content, flags=re.IGNORECASE | re.DOTALL) | |
content = re.sub(r'Subscribe.*?newsletter', '', content, flags=re.IGNORECASE) | |
content = re.sub(r'Follow.*?Twitter', '', content, flags=re.IGNORECASE) | |
# Remove author bio sections (they appear at the end) | |
content = re.sub(r'Iain Morris International Editor, Light Reading.*?$', '', content, flags=re.IGNORECASE | re.DOTALL) | |
# Remove advertisement markers | |
content = re.sub(r'\[Advertisement\]', '', content, flags=re.IGNORECASE) | |
content = re.sub(r'ADVERTISEMENT', '', content, flags=re.IGNORECASE) | |
# Clean up quotes and apostrophes | |
content = content.replace('"', '"').replace('"', '"') | |
content = content.replace(''', "'").replace(''', "'") | |
# Remove trailing whitespace and normalize line endings | |
content = '\n'.join(line.rstrip() for line in content.split('\n')) | |
content = content.strip() | |
return content | |
def has_provocative_elements(self, title: str, content: str) -> bool: | |
""" | |
Check if article has Iain Morris's provocative style elements | |
Args: | |
title: Article title | |
content: Article content | |
Returns: | |
True if article has strong stylistic elements | |
""" | |
# Provocative title indicators | |
provocative_words = [ | |
'danger', 'threat', 'crisis', 'disaster', 'collapse', 'failure', 'fiasco', | |
'wreck', 'crash', 'collision', 'explosion', 'doom', 'catastrophe', | |
'doubt', 'question', 'challenge', 'attack', 'battle', 'war', 'fight', | |
'gross', 'massive', 'huge', 'epic', 'monster', 'brutal' | |
] | |
title_lower = title.lower() | |
title_score = sum(1 for word in provocative_words if word in title_lower) | |
# Negative analogy indicators in content | |
analogy_patterns = [ | |
r'train wreck', r'off the rails', r'collision', r'explosion', r'fiasco', | |
r'disaster', r'catastrophe', r'meltdown', r'implosion', r'crash', | |
r'like.*disaster', r'as.*wreck', r'resembl.*catastrophe' | |
] | |
content_lower = content.lower() | |
analogy_score = sum(1 for pattern in analogy_patterns if re.search(pattern, content_lower)) | |
# Sarcastic/cynical indicators | |
cynical_patterns = [ | |
r'of course', r'naturally', r'predictably', r'unsurprisingly', | |
r'needless to say', r'obviously', r'clearly', r'evidently' | |
] | |
cynical_score = sum(1 for pattern in cynical_patterns if re.search(pattern, content_lower)) | |
# Calculate total style score | |
total_score = title_score + analogy_score + cynical_score | |
return total_score >= 2 # Require at least 2 style elements | |
def extract_topic_from_title(self, title: str) -> str: | |
""" | |
Extract a topic prompt from the article title, preserving provocative framing | |
Args: | |
title: Article title | |
Returns: | |
Topic prompt for training | |
""" | |
# Preserve provocative elements in the topic | |
topic = title | |
# For provocative titles, maintain the dramatic framing | |
provocative_starters = [ | |
'danger', 'threat', 'crisis', 'disaster', 'collapse', 'failure', | |
'doubt', 'question', 'challenge', 'attack', 'battle' | |
] | |
title_lower = title.lower() | |
is_provocative = any(starter in title_lower for starter in provocative_starters) | |
if is_provocative: | |
# Keep the provocative framing | |
if topic.endswith('?'): | |
topic = topic[:-1] | |
return f"Analyze the controversy and implications of: {topic}" | |
else: | |
# Standard topic extraction for less provocative titles | |
if topic.endswith('?'): | |
topic = topic[:-1] | |
if not topic.lower().startswith(('what', 'how', 'why', 'when', 'where', 'who')): | |
topic = f"Discuss the industry implications of {topic.lower()}" | |
# Add context if too short | |
if len(topic.split()) < 3: | |
topic = f"Write about {topic} in the telecom industry" | |
return topic | |
def filter_articles(self, articles: List[Dict]) -> List[Dict]: | |
""" | |
Filter articles based on quality criteria and prioritize provocative style | |
Args: | |
articles: List of article dictionaries | |
Returns: | |
Filtered list of articles, sorted by style strength | |
""" | |
filtered = [] | |
style_scores = [] | |
for article in articles: | |
content = article.get('content', '') | |
title = article.get('title', '') | |
# Skip if missing essential fields | |
if not content or not title: | |
continue | |
# Skip if content is too short or too long | |
if len(content) < self.min_content_length or len(content) > self.max_content_length: | |
continue | |
# Skip if title is too generic | |
if len(title.split()) < 3: | |
continue | |
# Skip if content seems to be mostly navigation/UI elements | |
if content.count('Click') > 5 or content.count('Subscribe') > 3: | |
continue | |
# Calculate style score for prioritization | |
cleaned_content = self.clean_content(content) | |
has_style = self.has_provocative_elements(title, cleaned_content) | |
# Calculate detailed style score for sorting | |
provocative_words = [ | |
'danger', 'threat', 'crisis', 'disaster', 'collapse', 'failure', 'fiasco', | |
'wreck', 'crash', 'collision', 'explosion', 'doom', 'catastrophe', | |
'doubt', 'question', 'challenge', 'attack', 'battle', 'war', 'fight', | |
'gross', 'massive', 'huge', 'epic', 'monster', 'brutal' | |
] | |
title_lower = title.lower() | |
title_score = sum(1 for word in provocative_words if word in title_lower) | |
analogy_patterns = [ | |
r'train wreck', r'off the rails', r'collision', r'explosion', r'fiasco', | |
r'disaster', r'catastrophe', r'meltdown', r'implosion', r'crash', | |
r'like.*disaster', r'as.*wreck', r'resembl.*catastrophe' | |
] | |
content_lower = cleaned_content.lower() | |
analogy_score = sum(1 for pattern in analogy_patterns if re.search(pattern, content_lower)) | |
cynical_patterns = [ | |
r'of course', r'naturally', r'predictably', r'unsurprisingly', | |
r'needless to say', r'obviously', r'clearly', r'evidently' | |
] | |
cynical_score = sum(1 for pattern in cynical_patterns if re.search(pattern, content_lower)) | |
total_style_score = title_score + analogy_score + cynical_score | |
filtered.append(article) | |
style_scores.append(total_style_score) | |
# Sort by style score (highest first) to prioritize provocative articles | |
sorted_pairs = sorted(zip(filtered, style_scores), key=lambda x: x[1], reverse=True) | |
filtered = [article for article, score in sorted_pairs] | |
# Count articles with strong style elements | |
strong_style_count = sum(1 for score in style_scores if score >= 2) | |
logger.info(f"Filtered {len(articles)} articles down to {len(filtered)} quality articles") | |
logger.info(f"Articles with strong Iain Morris style elements: {strong_style_count}") | |
return filtered | |
def create_training_examples(self, articles: List[Dict]) -> List[Dict]: | |
""" | |
Create training examples in instruction-response format | |
Args: | |
articles: List of article dictionaries | |
Returns: | |
List of training examples | |
""" | |
training_examples = [] | |
for article in articles: | |
title = article.get('title', '') | |
content = self.clean_content(article.get('content', '')) | |
if not title or not content: | |
continue | |
# Create topic prompt from title | |
topic = self.extract_topic_from_title(title) | |
# Create training example | |
example = { | |
'instruction': f"Write a telecom industry news article about: {topic}", | |
'input': "", | |
'output': f"# {title}\n\n{content}", | |
'system': self.system_prompt | |
} | |
training_examples.append(example) | |
logger.info(f"Created {len(training_examples)} training examples") | |
return training_examples | |
def create_chat_format(self, examples: List[Dict]) -> List[Dict]: | |
""" | |
Convert examples to chat format for training | |
Args: | |
examples: List of training examples | |
Returns: | |
List of examples in chat format | |
""" | |
chat_examples = [] | |
for example in examples: | |
chat_example = { | |
'messages': [ | |
{ | |
'role': 'system', | |
'content': example['system'] | |
}, | |
{ | |
'role': 'user', | |
'content': example['instruction'] | |
}, | |
{ | |
'role': 'assistant', | |
'content': example['output'] | |
} | |
] | |
} | |
chat_examples.append(chat_example) | |
return chat_examples | |
def split_dataset(self, examples: List[Dict], train_ratio: float = 0.9) -> Tuple[List[Dict], List[Dict]]: | |
""" | |
Split dataset into train and validation sets | |
Args: | |
examples: List of training examples | |
train_ratio: Ratio of examples to use for training | |
Returns: | |
Tuple of (train_examples, val_examples) | |
""" | |
split_idx = int(len(examples) * train_ratio) | |
# Shuffle examples | |
import random | |
random.seed(42) | |
shuffled = examples.copy() | |
random.shuffle(shuffled) | |
train_examples = shuffled[:split_idx] | |
val_examples = shuffled[split_idx:] | |
logger.info(f"Split dataset: {len(train_examples)} train, {len(val_examples)} validation") | |
return train_examples, val_examples | |
def save_dataset(self, examples: List[Dict], filepath: str): | |
""" | |
Save dataset to JSON file | |
Args: | |
examples: List of examples | |
filepath: Output file path | |
""" | |
with open(filepath, 'w', encoding='utf-8') as f: | |
json.dump(examples, f, indent=2, ensure_ascii=False) | |
logger.info(f"Saved {len(examples)} examples to {filepath}") | |
def create_hf_dataset(self, examples: List[Dict]) -> Dataset: | |
""" | |
Create Hugging Face Dataset object | |
Args: | |
examples: List of training examples | |
Returns: | |
Hugging Face Dataset | |
""" | |
return Dataset.from_list(examples) | |
def process_articles(self, input_file: str, output_dir: str = "data"): | |
""" | |
Complete preprocessing pipeline | |
Args: | |
input_file: Path to raw articles JSON file | |
output_dir: Directory to save processed data | |
""" | |
logger.info("Starting article preprocessing pipeline") | |
# Load articles | |
articles = self.load_articles(input_file) | |
if not articles: | |
logger.error("No articles loaded, exiting") | |
return | |
# Disable Filter articles | |
filtered_articles = articles # self.filter_articles(articles) | |
if not filtered_articles: | |
logger.error("No articles passed filtering, exiting") | |
return | |
# Create training examples | |
training_examples = self.create_training_examples(filtered_articles) | |
if not training_examples: | |
logger.error("No training examples created, exiting") | |
return | |
# Load additional training examples from supplementary files | |
logger.info("Loading additional training examples from supplementary files") | |
# Load general Iain Morris style examples | |
try: | |
with open('data/additional_training_examples.json', 'r', encoding='utf-8') as f: | |
additional_examples = json.load(f) | |
logger.info(f"Loaded {len(additional_examples)} additional training examples") | |
# Convert chat format to training format and add to training_examples | |
for example in additional_examples: | |
if 'messages' in example and len(example['messages']) >= 3: | |
system_msg = example['messages'][0]['content'] | |
user_msg = example['messages'][1]['content'] | |
assistant_msg = example['messages'][2]['content'] | |
training_example = { | |
'instruction': user_msg, | |
'input': "", | |
'output': assistant_msg, | |
'system': system_msg | |
} | |
training_examples.append(training_example) | |
except Exception as e: | |
logger.warning(f"Could not load additional_training_examples.json: {e}") | |
# Load expanded telecom training dataset | |
try: | |
with open('data/expanded_train_dataset.json', 'r', encoding='utf-8') as f: | |
expanded_examples = json.load(f) | |
logger.info(f"Loaded {len(expanded_examples)} expanded training examples") | |
# Convert chat format to training format and add to training_examples | |
for example in expanded_examples: | |
if 'messages' in example and len(example['messages']) >= 3: | |
system_msg = example['messages'][0]['content'] | |
user_msg = example['messages'][1]['content'] | |
assistant_msg = example['messages'][2]['content'] | |
training_example = { | |
'instruction': user_msg, | |
'input': "", | |
'output': assistant_msg, | |
'system': system_msg | |
} | |
training_examples.append(training_example) | |
except Exception as e: | |
logger.warning(f"Could not load expanded_train_dataset.json: {e}") | |
logger.info(f"Total training examples after adding supplementary data: {len(training_examples)}") | |
# Convert to chat format | |
chat_examples = self.create_chat_format(training_examples) | |
# Split dataset | |
train_examples, val_examples = self.split_dataset(chat_examples) | |
# Save datasets | |
self.save_dataset(train_examples, f"{output_dir}/train_dataset.json") | |
self.save_dataset(val_examples, f"{output_dir}/val_dataset.json") | |
self.save_dataset(training_examples, f"{output_dir}/processed_dataset.json") | |
# Create and save HF datasets | |
train_dataset = self.create_hf_dataset(train_examples) | |
val_dataset = self.create_hf_dataset(val_examples) | |
train_dataset.save_to_disk(f"{output_dir}/train_hf_dataset") | |
val_dataset.save_to_disk(f"{output_dir}/val_hf_dataset") | |
# Print summary | |
print(f"\nPreprocessing Summary:") | |
print(f"Original articles: {len(articles)}") | |
print(f"Filtered articles: {len(filtered_articles)}") | |
print(f"Training examples: {len(train_examples)}") | |
print(f"Validation examples: {len(val_examples)}") | |
print(f"Average article length: {sum(len(ex['messages'][2]['content']) for ex in train_examples) // len(train_examples)} characters") | |
# Show sample | |
if train_examples: | |
print(f"\nSample training example:") | |
sample = train_examples[0] | |
print(f"User: {sample['messages'][1]['content'][:100]}...") | |
print(f"Assistant: {sample['messages'][2]['content'][:200]}...") | |
def main(): | |
""" | |
Main function to run preprocessing | |
""" | |
preprocessor = ArticlePreprocessor() | |
preprocessor.process_articles("data/raw_articles.json") | |
if __name__ == "__main__": | |
main() | |