Spaces:
Running
Running
from fastapi import HTTPException | |
import httpx | |
import asyncio | |
from bs4 import BeautifulSoup | |
from googletrans import Translator | |
import re | |
from textblob import TextBlob | |
import nltk | |
import matplotlib.pyplot as plt | |
import os | |
import uuid | |
from urllib.parse import urlparse | |
import pandas as pd | |
from wordcloud import WordCloud, STOPWORDS | |
from .constants import STOP_WORDS_REMOVER_NEW | |
# nltk.download("punkt") | |
async def get_links(urls): | |
links = [] | |
async with httpx.AsyncClient() as client: | |
tasks = [] | |
for url in urls: | |
tasks.append(fetch_links(client, url)) | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
for result in results: | |
if isinstance(result, list): | |
links.extend(result) | |
else: | |
print(f'error: {result}') | |
return links | |
async def fetch_links(client, url): | |
links = [] | |
try: | |
response = await client.get(url) | |
soup = BeautifulSoup(response.text, 'html.parser') | |
articles = soup.find_all('article') | |
for article in articles: | |
link = article.find('a')['href'] | |
links.append(link) | |
except Exception as e: | |
print(f'error fetching {url}: {e}') | |
return links | |
async def get_texts(links): | |
texts = [] | |
async with httpx.AsyncClient() as client: | |
tasks = [] | |
for link in links: | |
tasks.append(fetch_texts(client, link)) | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
for result in results: | |
if isinstance(result, list): | |
texts.extend(result) | |
else: | |
print(f'error: {result}') | |
return texts | |
async def fetch_texts(client, url): | |
texts = [] | |
try: | |
response = await client.get(url) | |
soup = BeautifulSoup(response.text, 'html.parser') | |
paragraphs = soup.find('div', class_='entry-content').find_all('p') | |
first_2_paragraphs = " ".join([paragraph.text.lower() for paragraph in paragraphs[:2]]) | |
title = soup.find('h1', class_='entry-title').text.lower() | |
txt = f'{title} {first_2_paragraphs}' | |
cleaned_txt = clean_text(txt) | |
stopword_txt = stopword(cleaned_txt) | |
texts.append({ | |
"title": title, | |
"paragraph": stopword_txt, | |
"url": url, | |
"nama_media": urlparse(url).netloc.split(".")[0] | |
}) | |
except Exception as err: | |
print(f'error fetching {url}: {err}') | |
return texts | |
def clean_text(text): | |
text = re.sub(r'[^A-Za-z0-9 /]', '', text) #hapus nonaplhanumeric | |
text = re.sub(r'\s+', ' ', text).strip() #hapus spasi berlebihan | |
text = re.sub(r'\n', ' ', text) #hapus paragraf berlebihan | |
return text | |
def stopword(str_text): | |
str_text = STOP_WORDS_REMOVER_NEW.remove(str_text) | |
return str_text | |
def convert_english(text): | |
translator = Translator() | |
translation = translator.translate(text, dest='en', src='id').text | |
return translation | |
def sentiment_analysis(texts): | |
status = [] | |
total_positif = total_negatif = total_netral = total = 0 | |
for _, text in enumerate(texts): | |
analysis = TextBlob(text) | |
if analysis.sentiment.polarity > 0.0: | |
total_positif += 1 | |
status.append('Positif') | |
elif analysis.sentiment.polarity == 0.0: | |
total_netral += 1 | |
status.append('Netral') | |
else: | |
total_negatif += 1 | |
status.append('Negatif') | |
total += 1 | |
return { | |
"status": status, | |
"positif": total_positif, | |
"netral": total_netral, | |
"negatif": total_negatif, | |
"total": total | |
} | |
def generate_wordcloud(texts): | |
try: | |
# Generate word cloud | |
all_words = " ".join([text for text in texts]) | |
wordcloud = WordCloud( | |
width=3000, | |
height=2000, | |
random_state=3, | |
background_color='white', | |
stopwords=STOPWORDS, | |
).generate(all_words) | |
# Create a unique filename | |
format_file = "svg" | |
filename = f"wordcloud.{format_file}" | |
file_path = os.path.join('./static/wordcloud/', filename) | |
# Save the word cloud to a file | |
plt.figure() | |
plt.imshow(wordcloud, interpolation='bilinear') | |
plt.axis('off') | |
plt.tight_layout(pad=0) | |
plt.savefig(file_path, format=format_file) | |
plt.close() | |
return file_path | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
def generate_csv(object): | |
df = pd.DataFrame(object) | |
filename = f"{uuid.uuid4()}.csv" | |
file_path = os.path.join('./static/csv/', filename) | |
df.to_csv(file_path) | |
return file_path |