ContentWeaverAI / src /utils.py
bisryy's picture
fix: fix an error
2d7fa45
import os
os.environ['TRANSFORMERS_CACHE'] = '/tmp/.cache'
os.environ['HF_HOME'] = '/tmp/.cache'
os.environ['HF_DATASETS_CACHE'] = '/tmp/.cache'
os.environ['HF_METRICS_CACHE'] = '/tmp/.cache'
import feedparser
import requests
from bs4 import BeautifulSoup
import chromadb
from sentence_transformers import SentenceTransformer
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
import torch
import time
import json
import uuid
from dotenv import load_dotenv
load_dotenv()
# --- GLOBAL SETUP ---
# Load embedding model once
print('Loading embedding model...')
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
print('Embedding model loaded.')
# Initialize Chroma client once
client = chromadb.Client()
collection_name = "newsletter_articles"
# Load LLM once
model_id = "meta-llama/Meta-Llama-3-8B-Instruct"
print(f"Loading LLM: {model_id}")
from huggingface_hub import login
hf_token = os.getenv('HF_Token')
if hf_token:
login(token=hf_token)
else:
print("HF_Token not found in environment. Check your .env file.")
tokenizer = AutoTokenizer.from_pretrained(model_id)
if tokenizer.pad_token is None:
print("Warning: pad_token is None. Setting pad_token to eos_token.")
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(
model_id,
device_map="auto",
torch_dtype=torch.bfloat16
)
print("LLM loaded.")
llm_pipeline = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
torch_dtype=torch.bfloat16,
device_map="auto",
)
# --- MAIN FUNCTION ---
def run_newsletter_workflow(prferences_dict):
user_preferences = {
"id": str(uuid.uuid4()),
"keywords": prferences_dict.get("keywords", []),
"preferred_tone": prferences_dict.get("preferred_tone", 'informative'),
}
if not user_preferences["keywords"]:
return None, "No Keywords provided"
rss_feed_urls = [
"http://feeds.feedburner.com/TechCrunch/artificial-intelligence",
"https://news.mit.edu/topic/mitcobrand-artificial-intelligence2-rss.xml",
"https://hackingbutlegal.com/feed/",
]
def fetch_articles_from_feeds(feed_urls):
articles = []
for url in feed_urls:
try:
feed = feedparser.parse(url)
for entry in feed.entries:
articles.append({
"id": str(uuid.uuid4()),
"title": entry.title,
"link": entry.link,
"published": entry.get("published", "N/A"),
"summary": entry.get("summary", ""),
"content": entry.get("content", [{"value": entry.get("summary", "")}])[0].get("value", entry.get("summary", ""))
})
print(f"Fetched {len(feed.entries)} entries from {url}")
time.sleep(1)
except Exception as e:
print(f"Error fetching feed {url}: {e}")
return articles
fetched_articles = fetch_articles_from_feeds(rss_feed_urls)
print(f"\nFetched a total of {len(fetched_articles)} articles.")
def scrape_article_content(url):
try:
headers = { 'User-Agent': 'MyNewsletterBot/1.0 (+http://example.com/botinfo)'}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
main_content = soup.find('article') or soup.find('main') or soup.find('div', class_ = 'content')
if main_content:
text = ' '.join(main_content.stripped_strings)
return text[:5000]
else:
paragraphs = soup.find_all('p')
text = ' '.join(p.get_text() for p in paragraphs)
return text[:5000]
except requests.exceptions.RequestException as e:
print(f"Scraping error for {url}: {e}")
return None
except Exception as e:
print(f'Scraping Parsing error for {url}: {e}')
return None
for article in fetched_articles:
print(f"Attempting to scrape: {article['link']}")
full_content = scrape_article_content(article['link'])
if full_content:
article['content'] = full_content
time.sleep(2)
# Setup Chroma collection (delete existing, create new)
try:
client.delete_collection(name=collection_name)
print(f'Deleted existing collection: {collection_name}')
except Exception:
pass
collection = client.create_collection(name=collection_name)
print(f"Created collection: {collection_name}")
print("Adding articles to Vector DB...")
ids_to_add = []
embeddings_to_add = []
documents_to_add = []
metadata_to_add = []
def clean_text(text):
return ' '.join(text.split())
for article in fetched_articles:
cleaned_content = clean_text(article['content'])
if not cleaned_content:
continue
ids_to_add.append(article['id'])
documents_to_add.append(cleaned_content)
metadata_to_add.append({
"title": article['title'],
"link": article['link'],
"published": article['published']
})
embedding = embedding_model.encode(cleaned_content, convert_to_tensor=True)
embeddings_to_add.append(embedding.tolist())
if ids_to_add:
collection.add(
ids=ids_to_add,
embeddings=embeddings_to_add,
documents=documents_to_add,
metadatas=metadata_to_add
)
print(f"Added {len(ids_to_add)} articles to the collection.")
else:
print("No valid articles found to add to the collection.")
def retrieve_relevent_articles(query_keywords, top_n=5):
if collection.count() == 0:
print("Collection is empty. Cannot retrieve.")
return []
query_text = " ".join(query_keywords)
query_embedding = embedding_model.encode(query_text, convert_to_tensor=False).tolist()
print(f"\nQuerying for articles related to: '{query_text}'")
results = collection.query(
query_embeddings=[query_embedding],
n_results=top_n,
include=['metadatas', 'documents']
)
print(f'Retrieved {len(results["ids"][0])} articles.')
return results
relevent_articles_data = retrieve_relevent_articles(user_preferences['keywords'], top_n=3)
print("\nRelevent data sample:")
print(json.dumps(relevent_articles_data, indent=2))
def generate_summary(article_content, max_length=150):
max_input_length = 3000
truncated_content = tokenizer.decode(
tokenizer.encode(article_content, max_length=max_input_length, truncation=True)
)
messages = [
{"role": "system", "content": "You are a helpful assistant that summarizes articles concisely."},
{"role": "user", "content": f"Please summarize the following article:\n\n{truncated_content}\n\nSummary:"}
]
try:
prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
except Exception:
prompt = (
f"System: You are a helpful assistant that summarizes articles concisely.\n"
f"User: Please summarize the following article:\n\n{truncated_content}\n\nSummary:\nAssistant:"
)
print(f"\nGenerating summary...")
sequences = llm_pipeline(
prompt,
max_new_tokens=max_length + 50,
do_sample=True,
temperature=0.7,
top_k=50,
top_p=0.95,
num_return_sequences=1,
eos_token_id=tokenizer.eos_token_id,
pad_token_id=tokenizer.pad_token_id
)
try:
summary = sequences[0]['generated_text']
assistant_marker = "Assistant:"
summary_start_index = summary.rfind(assistant_marker)
if summary_start_index != -1:
summary = summary[summary_start_index + len(assistant_marker):].strip()
else:
summary = summary.replace(prompt, "").strip()
print("Summary generated.")
return summary
except Exception as e:
print(f"Error processing LLM output: {e}")
return "Error generating summary."
summaries = {}
if relevent_articles_data and relevent_articles_data.get('ids'):
for i, article_id in enumerate(relevent_articles_data['ids'][0]):
content = relevent_articles_data['documents'][0][i]
title = relevent_articles_data['metadatas'][0][i]['title']
print(f"\nProcessing article: {title}")
summaries[article_id] = generate_summary(content)
time.sleep(1)
else:
print("No relevent articles retrieved to summarize.")
def generate_commentary(summary, title, user_tone, max_length=75):
messages = [
{"role": "system", "content": f"You are a content curator writing brief, engaging commentary for a newsletter. Adopt a {user_tone} tone."},
{"role": "user", "content": f"Write a short comment (1-2 sentences) about the following article summary titled '{title}'. Relate it briefly to general interests in AI if possible, but focus on being engaging.\n\nSummary: {summary}\n\nCommentary:"}
]
try:
prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
except Exception:
prompt = (
f"System: You are a content curator writing brief, engaging commentary for a newsletter. Adopt a {user_tone} tone.\n"
f"User: Write a short comment (1-2 sentences) about the following article summary titled '{title}'. Relate it briefly to general interests in AI if possible, but focus on being engaging.\n\nSummary: {summary}\n\nCommentary:\nAssistant:"
)
print(f'Generating commentary for: {title}')
sequences = llm_pipeline(
prompt,
max_new_tokens=max_length + 30,
do_sample=True,
temperature=0.8,
top_k=50,
top_p=0.95,
num_return_sequences=1,
eos_token_id=tokenizer.eos_token_id,
pad_token_id=tokenizer.pad_token_id,
)
try:
commentary = sequences[0]['generated_text']
assistant_marker = "Assistant:"
commentary_start_index = commentary.rfind(assistant_marker)
if commentary_start_index != -1:
commentary = commentary[commentary_start_index + len(assistant_marker):].strip()
else:
commentary = commentary.replace(prompt, "").strip()
print('Commentary generated.')
return commentary
except Exception as e:
print(f"Error processing LLM output for commentary: {e}")
return "Error generating commentary"
commentaries = {}
if relevent_articles_data and relevent_articles_data.get('ids'):
for i, article_id in enumerate(relevent_articles_data['ids'][0]):
if article_id in summaries:
title = relevent_articles_data['metadatas'][0][i]['title']
summary_text = summaries[article_id]
commentaries[article_id] = generate_commentary(summary_text, title, user_preferences["preferred_tone"])
time.sleep(1)
def format_newsletter(retrieved_data, summaries_dict, commentaries_dict):
newsletter = "# Your AI Agent & Workflow Digest 📰\n\n"
newsletter += "Here are some articles curated based on your interests:\n\n"
if not retrieved_data or not retrieved_data.get('ids') or not retrieved_data['ids'][0]:
newsletter += "No relevant articles found this time."
return newsletter
for i, article_id in enumerate(retrieved_data['ids'][0]):
metadata = retrieved_data['metadatas'][0][i]
summary = summaries_dict.get(article_id, "Summary not available.")
commentary = commentaries_dict.get(article_id, "")
newsletter += f"## {metadata['title']}\n\n"
newsletter += f"**Source:** [{metadata['link']}]({metadata['link']})\n"
newsletter += f"**Published:** {metadata['published']}\n\n"
newsletter += f"**Summary:** {summary}\n\n"
if commentary:
newsletter += f"**Quick Take:** {commentary}\n\n"
newsletter += "---\n\n"
return newsletter
final_newsletter = format_newsletter(relevent_articles_data, summaries, commentaries)
print("\n\n--- GENERATED NEWSLETTER ---")
print(final_newsletter)
print("--- END OF NEWSLETTER ---")
return final_newsletter, "Newsletter generation successful."