from fastapi import HTTPException import httpx import asyncio from bs4 import BeautifulSoup from googletrans import Translator import re from textblob import TextBlob import nltk import matplotlib.pyplot as plt import os import uuid from urllib.parse import urlparse import pandas as pd from wordcloud import WordCloud, STOPWORDS from .constants import STOP_WORDS_REMOVER_NEW # nltk.download("punkt") async def get_links(urls): links = [] async with httpx.AsyncClient() as client: tasks = [] for url in urls: tasks.append(fetch_links(client, url)) results = await asyncio.gather(*tasks, return_exceptions=True) for result in results: if isinstance(result, list): links.extend(result) else: print(f'error: {result}') return links async def fetch_links(client, url): links = [] try: response = await client.get(url) soup = BeautifulSoup(response.text, 'html.parser') articles = soup.find_all('article') for article in articles: link = article.find('a')['href'] links.append(link) except Exception as e: print(f'error fetching {url}: {e}') return links async def get_texts(links): texts = [] async with httpx.AsyncClient() as client: tasks = [] for link in links: tasks.append(fetch_texts(client, link)) results = await asyncio.gather(*tasks, return_exceptions=True) for result in results: if isinstance(result, list): texts.extend(result) else: print(f'error: {result}') return texts async def fetch_texts(client, url): texts = [] try: response = await client.get(url) soup = BeautifulSoup(response.text, 'html.parser') paragraphs = soup.find('div', class_='entry-content').find_all('p') first_2_paragraphs = " ".join([paragraph.text.lower() for paragraph in paragraphs[:2]]) title = soup.find('h1', class_='entry-title').text.lower() txt = f'{title} {first_2_paragraphs}' cleaned_txt = clean_text(txt) stopword_txt = stopword(cleaned_txt) texts.append({ "title": title, "paragraph": stopword_txt, "url": url, "nama_media": urlparse(url).netloc.split(".")[0] }) except Exception as err: print(f'error fetching {url}: {err}') return texts def clean_text(text): text = re.sub(r'[^A-Za-z0-9 /]', '', text) #hapus nonaplhanumeric text = re.sub(r'\s+', ' ', text).strip() #hapus spasi berlebihan text = re.sub(r'\n', ' ', text) #hapus paragraf berlebihan return text def stopword(str_text): str_text = STOP_WORDS_REMOVER_NEW.remove(str_text) return str_text def convert_english(text): translator = Translator() translation = translator.translate(text, dest='en', src='id').text return translation def sentiment_analysis(texts): status = [] total_positif = total_negatif = total_netral = total = 0 for _, text in enumerate(texts): analysis = TextBlob(text) if analysis.sentiment.polarity > 0.0: total_positif += 1 status.append('Positif') elif analysis.sentiment.polarity == 0.0: total_netral += 1 status.append('Netral') else: total_negatif += 1 status.append('Negatif') total += 1 return { "status": status, "positif": total_positif, "netral": total_netral, "negatif": total_negatif, "total": total } def generate_wordcloud(texts): try: # Generate word cloud all_words = " ".join([text for text in texts]) wordcloud = WordCloud( width=3000, height=2000, random_state=3, background_color='white', stopwords=STOPWORDS, ).generate(all_words) # Create a unique filename format_file = "svg" filename = f"wordcloud.{format_file}" file_path = os.path.join('./static/wordcloud/', filename) # Save the word cloud to a file plt.figure() plt.imshow(wordcloud, interpolation='bilinear') plt.axis('off') plt.tight_layout(pad=0) plt.savefig(file_path, format=format_file) plt.close() return file_path except Exception as e: raise HTTPException(status_code=500, detail=str(e)) def generate_csv(object): df = pd.DataFrame(object) filename = f"{uuid.uuid4()}.csv" file_path = os.path.join('./static/csv/', filename) df.to_csv(file_path) return file_path