Spaces:
Running
Running
from app import Plugin | |
import streamlit as st | |
import sqlite3 | |
import requests | |
from bs4 import BeautifulSoup | |
from datetime import datetime | |
import ollama | |
from global_vars import t, translations | |
# Ajout des traductions spécifiques à ce plugin | |
translations["en"].update({ | |
"scansite_title": "News Aggregator", | |
"total_links": "Total number of links", | |
"annotated_links": "Number of annotated links", | |
"known_tags": "Known tags", | |
"reset_database": "Reset database", | |
"database_reset_success": "Database reset successfully", | |
"launch_scan": "Launch scan", | |
"scan_complete": "Scan complete", | |
"no_articles": "No articles to display.", | |
"page": "Page", | |
"previous_page": "Previous page", | |
"next_page": "Next page", | |
"new_articles": "New Articles", | |
"rated_articles": "Rated Articles", | |
"clicked_not_rated": "Clicked but not rated Articles", | |
"tagged_articles": "Tagged Articles", | |
"ignored_articles": "Ignored Articles", | |
"excluded_articles": "Excluded Articles", | |
"rating": "Rating", | |
"tags": "Tags", | |
"exclude": "Exclude", | |
"sources": "Sources", | |
"update": "Update", | |
"delete": "Delete", | |
"add_new_source": "Add a new source (URL)", | |
"add_source": "Add source", | |
"new_tag": "New tag", | |
"new_tag_description": "New tag description", | |
"add_tag": "Add tag", | |
"work_directory": "Work Directory", | |
}) | |
translations["fr"].update({ | |
"scansite_title": "Agrégateur de Nouvelles", | |
"total_links": "Nombre total de liens", | |
"annotated_links": "Nombre de liens annotés", | |
"known_tags": "Tags connus", | |
"reset_database": "Réinitialiser la base de données", | |
"database_reset_success": "Base de données réinitialisée", | |
"launch_scan": "Lancer le scan", | |
"scan_complete": "Scan terminé", | |
"no_articles": "Aucun article à afficher.", | |
"page": "Page", | |
"previous_page": "Page précédente", | |
"next_page": "Page suivante", | |
"new_articles": "Nouveaux Articles", | |
"rated_articles": "Articles Notés", | |
"clicked_not_rated": "Articles Cliqués non notés", | |
"tagged_articles": "Articles Tagués", | |
"ignored_articles": "Articles Ignorés", | |
"excluded_articles": "Articles Exclus", | |
"rating": "Note", | |
"tags": "Tags", | |
"exclude": "Exclure", | |
"sources": "Sources", | |
"update": "Mettre à jour", | |
"delete": "Supprimer", | |
"add_new_source": "Ajouter une nouvelle source (URL)", | |
"add_source": "Ajouter source", | |
"new_tag": "Nouveau tag", | |
"new_tag_description": "Description du nouveau tag", | |
"add_tag": "Ajouter tag", | |
"work_directory": "Répertoire de travail", | |
}) | |
class ScansitePlugin(Plugin): | |
def __init__(self, name, plugin_manager): | |
super().__init__(name, plugin_manager) | |
self.conn = self.get_connection() | |
self.c = self.conn.cursor() | |
self.init_db() | |
def get_connection(self): | |
return sqlite3.connect('news_app.db', check_same_thread=False) | |
def init_db(self): | |
current_version = self.get_db_version() | |
if current_version < 1: | |
self.c.execute('''CREATE TABLE IF NOT EXISTS sources | |
(id INTEGER PRIMARY KEY, url TEXT, title TEXT)''') | |
self.c.execute('''CREATE TABLE IF NOT EXISTS articles | |
(id INTEGER PRIMARY KEY, source_id INTEGER, url TEXT UNIQUE, title TEXT, date TEXT, | |
is_new INTEGER, is_excluded INTEGER DEFAULT 0)''') | |
self.c.execute('''CREATE TABLE IF NOT EXISTS user_actions | |
(id INTEGER PRIMARY KEY, article_id INTEGER, action TEXT, rating INTEGER, tags TEXT, timestamp TEXT)''') | |
self.c.execute('''CREATE TABLE IF NOT EXISTS tags | |
(id INTEGER PRIMARY KEY, name TEXT UNIQUE, description TEXT)''') | |
self.set_db_version(1) | |
# Add more version upgrades here | |
# if current_version < 2: | |
# self.c.execute('''ALTER TABLE articles ADD COLUMN new_column TEXT''') | |
# self.set_db_version(2) | |
self.conn.commit() | |
def get_db_version(self): | |
self.c.execute('''CREATE TABLE IF NOT EXISTS db_version (version INTEGER)''') | |
self.c.execute('SELECT version FROM db_version') | |
result = self.c.fetchone() | |
return result[0] if result else 0 | |
def set_db_version(self, version): | |
self.c.execute('INSERT OR REPLACE INTO db_version (rowid, version) VALUES (1, ?)', (version,)) | |
self.conn.commit() | |
def get_tabs(self): | |
return [{"name": t("scansite_title"), "plugin": "scansite"}] | |
def run(self, config): | |
st.title(t("scansite_title")) | |
total_links, annotated_links = self.get_stats() | |
st.write(f"{t('total_links')} : {total_links}") | |
st.write(f"{t('annotated_links')} : {annotated_links}") | |
all_tags = self.get_all_tags() | |
st.write(f"{t('known_tags')} :", ", ".join(all_tags)) | |
if st.button(t("reset_database")): | |
self.reset_database() | |
st.success(t("database_reset_success")) | |
if st.button(t("launch_scan")): | |
self.launch_scan() | |
st.success(t("scan_complete")) | |
self.display_tabs() | |
def get_stats(self): | |
total_links = self.c.execute("SELECT COUNT(*) FROM articles WHERE is_excluded = 0").fetchone()[0] | |
annotated_links = self.c.execute(""" | |
SELECT COUNT(DISTINCT article_id) FROM user_actions | |
WHERE action IN ('click', 'rate', 'tag') | |
""").fetchone()[0] | |
return total_links, annotated_links | |
def get_all_tags(self): | |
return [row[0] for row in self.c.execute("SELECT name FROM tags").fetchall()] | |
def reset_database(self): | |
self.c.execute("DROP TABLE IF EXISTS sources") | |
self.c.execute("DROP TABLE IF EXISTS articles") | |
self.c.execute("DROP TABLE IF EXISTS user_actions") | |
self.c.execute("DROP TABLE IF EXISTS tags") | |
self.conn.commit() | |
self.init_db() | |
def launch_scan(self): | |
sources = self.c.execute("SELECT * FROM sources").fetchall() | |
for source in sources: | |
self.mark_not_new(source[0]) | |
links = self.scan_new_links(source[0], source[1]) | |
for link, title in links: | |
self.c.execute(""" | |
INSERT OR IGNORE INTO articles (source_id, url, title, date, is_new, is_excluded) | |
VALUES (?, ?, ?, ?, 1, 0) | |
""", (source[0], link, title, datetime.now().strftime('%Y-%m-%d'))) | |
self.conn.commit() | |
def display_tabs(self): | |
tab1, tab2, tab3, tab4, tab5, tab6 = st.tabs([ | |
t("new_articles"), t("rated_articles"), t("clicked_not_rated"), | |
t("tagged_articles"), t("ignored_articles"), t("excluded_articles") | |
]) | |
all_tags = self.get_all_tags() | |
with tab1: | |
st.header(t("new_articles")) | |
self.display_paginated_articles(self.get_new_articles(), all_tags, "nouveaux") | |
with tab2: | |
st.header(t("rated_articles")) | |
self.display_paginated_articles(self.get_rated_articles(), all_tags, "notes") | |
with tab3: | |
st.header(t("clicked_not_rated")) | |
self.display_paginated_articles(self.get_clicked_not_rated_articles(), all_tags, "cliques") | |
with tab4: | |
st.header(t("tagged_articles")) | |
self.display_paginated_articles(self.get_tagged_articles(), all_tags, "tagues") | |
with tab5: | |
st.header(t("ignored_articles")) | |
self.display_paginated_articles(self.get_ignored_articles(), all_tags, "ignores") | |
with tab6: | |
st.header(t("excluded_articles")) | |
self.display_paginated_articles(self.get_excluded_articles(), all_tags, "exclus") | |
def display_paginated_articles(self, articles, all_tags, tab_name, items_per_page=20): | |
if not articles: | |
st.write(t("no_articles")) | |
return | |
total_pages = (len(articles) - 1) // items_per_page + 1 | |
page_key = f"{tab_name}_page" | |
if page_key not in st.session_state: | |
st.session_state[page_key] = 1 | |
page = st.number_input(t("page"), min_value=1, max_value=total_pages, value=st.session_state[page_key], key=f"{tab_name}_number_input") | |
st.session_state[page_key] = page | |
start_idx = (page - 1) * items_per_page | |
end_idx = start_idx + items_per_page | |
for article in articles[start_idx:end_idx]: | |
self.display_article(article, all_tags, tab_name) | |
col1, col2, col3 = st.columns(3) | |
with col1: | |
if page > 1: | |
if st.button(t("previous_page"), key=f"{tab_name}_prev"): | |
st.session_state[page_key] = page - 1 | |
st.rerun() | |
with col3: | |
if page < total_pages: | |
if st.button(t("next_page"), key=f"{tab_name}_next"): | |
st.session_state[page_key] = page + 1 | |
st.rerun() | |
with col2: | |
st.write(f"{t('page')} {page}/{total_pages}") | |
def display_article(self, article, all_tags, tab_name): | |
article_id = article[0] | |
col1, col2, col3, col4, col5 = st.columns([3, 0.5, 1, 2, 1]) | |
with col1: | |
summary_key = f"{tab_name}_summary_{article_id}" | |
if summary_key not in st.session_state: | |
st.session_state[summary_key] = None | |
if st.button(article[3], key=f"{tab_name}_article_{article_id}"): | |
summary = self.get_article_summary(article[2]) | |
st.session_state[summary_key] = summary | |
self.c.execute("INSERT INTO user_actions (article_id, action, timestamp) VALUES (?, ?, ?)", | |
(article_id, 'click', datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) | |
self.c.execute("UPDATE articles SET is_new = 0 WHERE id = ?", (article_id,)) | |
self.conn.commit() | |
if st.session_state[summary_key]: | |
st.write(st.session_state[summary_key]) | |
with col2: | |
st.markdown(f"[🔗]({article[2]})") | |
with col3: | |
rating_key = f"{tab_name}_rating_{article_id}" | |
current_rating = self.get_article_rating(article_id) | |
rating = st.slider(t("rating"), 0, 5, current_rating, key=rating_key) | |
if rating != current_rating: | |
self.c.execute("INSERT INTO user_actions (article_id, action, rating, timestamp) VALUES (?, ?, ?, ?)", | |
(article_id, 'rate', rating, datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) | |
self.conn.commit() | |
with col4: | |
tags_key = f"{tab_name}_tags_{article_id}" | |
current_tags = self.get_article_tags(article_id) | |
selected_tags = st.multiselect(t("tags"), all_tags, default=current_tags, key=tags_key) | |
if set(selected_tags) != set(current_tags): | |
tags_str = ','.join(selected_tags) | |
self.c.execute("INSERT INTO user_actions (article_id, action, tags, timestamp) VALUES (?, ?, ?, ?)", | |
(article_id, 'tag', tags_str, datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) | |
self.conn.commit() | |
with col5: | |
exclude_key = f"{tab_name}_exclude_{article_id}" | |
if st.button(t("exclude"), key=exclude_key): | |
self.c.execute("UPDATE articles SET is_excluded = 1 WHERE id = ?", (article_id,)) | |
self.conn.commit() | |
st.rerun() | |
def get_config_ui(self, config): | |
updated_config = {} | |
updated_config['sources'] = st.header(t("sources")) | |
sources = self.c.execute("SELECT * FROM sources").fetchall() | |
for source in sources: | |
col1, col2, col3 = st.columns([3, 1, 1]) | |
with col1: | |
new_title = st.text_input(f"{t('update')} {source[1]}", value=source[2], key=f"source_title_{source[0]}") | |
with col2: | |
if st.button(t("update"), key=f"update_source_{source[0]}"): | |
self.c.execute("UPDATE sources SET title = ? WHERE id = ?", (new_title, source[0])) | |
self.conn.commit() | |
with col3: | |
if st.button(t("delete"), key=f"delete_source_{source[0]}"): | |
self.c.execute("DELETE FROM sources WHERE id = ?", (source[0],)) | |
self.conn.commit() | |
new_url = st.text_input(t("add_new_source")) | |
if st.button(t("add_source")): | |
title = self.fetch_page_title(new_url) | |
self.c.execute("INSERT INTO sources (url, title) VALUES (?, ?)", (new_url, title)) | |
self.conn.commit() | |
st.header(t("tags")) | |
tags = self.get_all_tags_with_descriptions() | |
for tag, description in tags: | |
col1, col2, col3, col4 = st.columns([2, 3, 1, 1]) | |
with col1: | |
st.text(tag) | |
with col2: | |
new_description = st.text_input(f"{t('update')} {tag}", value=description, key=f"tag_desc_{tag}") | |
with col3: | |
if st.button(t("update"), key=f"update_tag_{tag}"): | |
self.add_or_update_tag(tag, new_description) | |
with col4: | |
if st.button(t("delete"), key=f"delete_tag_{tag}"): | |
self.delete_tag(tag) | |
new_tag = st.text_input(t("new_tag")) | |
new_tag_description = st.text_input(t("new_tag_description")) | |
if st.button(t("add_tag")): | |
self.add_or_update_tag(new_tag, new_tag_description) | |
# Ajout des configurations modifiées au dictionnaire updated_config | |
updated_config["sources"] = sources | |
updated_config["new_source_url"] = new_url | |
updated_config["tags"] = tags | |
updated_config["new_tag"] = new_tag | |
updated_config["new_tag_description"] = new_tag_description | |
return updated_config | |
def fetch_page_title(self, url): | |
try: | |
response = requests.get(url) | |
soup = BeautifulSoup(response.text, 'html.parser') | |
return soup.title.string | |
except: | |
return url | |
def mark_not_new(self, source_id): | |
self.c.execute("UPDATE articles SET is_new = 0 WHERE source_id = ?", (source_id,)) | |
self.conn.commit() | |
def scan_new_links(self, source_id, url): | |
links = self.scan_links(url) | |
filtered_links = [] | |
for link, title in links: | |
self.c.execute("SELECT id, is_excluded FROM articles WHERE url = ?", (link,)) | |
result = self.c.fetchone() | |
if result is None: | |
filtered_links.append((link, title)) | |
return filtered_links | |
def scan_links(self, url): | |
links = set() | |
try: | |
response = requests.get(url) | |
soup = BeautifulSoup(response.text, 'html.parser') | |
for link in soup.find_all('a'): | |
href = link.get('href') | |
title = link.text.strip() or href | |
if href and href.startswith('http'): | |
try: | |
article_response = requests.get(href) | |
article_soup = BeautifulSoup(article_response.text, 'html.parser') | |
if article_soup.find('article'): | |
links.add((href, title)) | |
except: | |
pass | |
except: | |
st.error(f"Erreur lors du scan de {url}") | |
return list(links) | |
def get_article_summary(self, url, model="qwen2"): | |
prompt = f"Résumez brièvement l'article à cette URL : {url}" | |
response = ollama.generate(model=model, prompt=prompt) | |
return response['response'] | |
def get_new_articles(self): | |
return self.c.execute(""" | |
SELECT * FROM articles | |
WHERE is_new = 1 | |
AND is_excluded = 0 | |
AND id NOT IN ( | |
SELECT DISTINCT article_id | |
FROM user_actions | |
WHERE action IN ('click', 'rate', 'tag') | |
) | |
ORDER BY date DESC | |
""").fetchall() | |
def get_rated_articles(self): | |
return self.c.execute(""" | |
SELECT DISTINCT a.* | |
FROM articles a | |
JOIN user_actions ua ON a.id = ua.article_id | |
WHERE ua.action = 'rate' | |
AND a.is_excluded = 0 | |
ORDER BY ua.timestamp DESC | |
""").fetchall() | |
def get_clicked_not_rated_articles(self): | |
return self.c.execute(""" | |
SELECT DISTINCT a.* | |
FROM articles a | |
JOIN user_actions ua ON a.id = ua.article_id | |
WHERE ua.action = 'click' | |
AND a.is_excluded = 0 | |
AND a.id NOT IN ( | |
SELECT article_id | |
FROM user_actions | |
WHERE action IN ('rate', 'tag') | |
) | |
ORDER BY ua.timestamp DESC | |
""").fetchall() | |
def get_tagged_articles(self): | |
return self.c.execute(""" | |
SELECT DISTINCT a.* | |
FROM articles a | |
JOIN user_actions ua ON a.id = ua.article_id | |
WHERE ua.action = 'tag' | |
AND a.is_excluded = 0 | |
AND a.id NOT IN ( | |
SELECT article_id | |
FROM user_actions | |
WHERE action IN ('rate', 'click') | |
) | |
ORDER BY ua.timestamp DESC | |
""").fetchall() | |
def get_ignored_articles(self): | |
return self.c.execute(""" | |
SELECT * FROM articles | |
WHERE is_new = 0 | |
AND is_excluded = 0 | |
AND id NOT IN ( | |
SELECT DISTINCT article_id | |
FROM user_actions | |
WHERE action IN ('click', 'rate', 'tag') | |
) | |
ORDER BY date DESC | |
""").fetchall() | |
def get_excluded_articles(self): | |
return self.c.execute(""" | |
SELECT * FROM articles | |
WHERE is_excluded = 1 | |
ORDER BY date DESC | |
""").fetchall() | |
def get_article_rating(self, article_id): | |
self.c.execute("SELECT rating FROM user_actions WHERE article_id = ? AND action = 'rate' ORDER BY timestamp DESC LIMIT 1", (article_id,)) | |
result = self.c.fetchone() | |
return result[0] if result else 0 | |
def get_article_tags(self, article_id): | |
self.c.execute("SELECT tags FROM user_actions WHERE article_id = ? AND action = 'tag' ORDER BY timestamp DESC LIMIT 1", (article_id,)) | |
result = self.c.fetchone() | |
return result[0].split(',') if result and result[0] else [] | |
def get_all_tags_with_descriptions(self): | |
return self.c.execute("SELECT name, description FROM tags").fetchall() | |
def add_or_update_tag(self, name, description): | |
self.c.execute("INSERT OR REPLACE INTO tags (name, description) VALUES (?, ?)", (name, description)) | |
self.conn.commit() | |
def delete_tag(self, name): | |
self.c.execute("DELETE FROM tags WHERE name = ?", (name,)) | |
self.conn.commit() | |
def get_reference_data(self): | |
# Récupérer les articles avec leur rating | |
self.c.execute(""" | |
SELECT a.id, a.url, a.title, COALESCE(ua.rating, 0) as rating | |
FROM articles a | |
LEFT JOIN ( | |
SELECT article_id, rating | |
FROM user_actions | |
WHERE action = 'rate' | |
GROUP BY article_id | |
HAVING MAX(timestamp) | |
) ua ON a.id = ua.article_id | |
WHERE a.is_excluded = 0 | |
ORDER BY rating DESC, a.date DESC | |
""") | |
articles = self.c.fetchall() | |
# Séparer les articles en valides (notés) et rejetés (non notés) | |
reference_data_valid = [(article[1], article[2], article[3]) for article in articles if article[3] > 0] | |
reference_data_rejected = [(article[1], article[2]) for article in articles if article[3] == 0] | |
return reference_data_valid, reference_data_rejected | |