sipmen / app /utils.py
calvin-vin's picture
feat: berita
ac36e20
raw
history blame contribute delete
No virus
4.86 kB
from fastapi import HTTPException
import httpx
import asyncio
from bs4 import BeautifulSoup
from googletrans import Translator
import re
from textblob import TextBlob
import nltk
import matplotlib.pyplot as plt
import os
import uuid
from urllib.parse import urlparse
import pandas as pd
from wordcloud import WordCloud, STOPWORDS
from .constants import STOP_WORDS_REMOVER_NEW
# nltk.download("punkt")
async def get_links(urls):
links = []
async with httpx.AsyncClient() as client:
tasks = []
for url in urls:
tasks.append(fetch_links(client, url))
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, list):
links.extend(result)
else:
print(f'error: {result}')
return links
async def fetch_links(client, url):
links = []
try:
response = await client.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
articles = soup.find_all('article')
for article in articles:
link = article.find('a')['href']
links.append(link)
except Exception as e:
print(f'error fetching {url}: {e}')
return links
async def get_texts(links):
texts = []
async with httpx.AsyncClient() as client:
tasks = []
for link in links:
tasks.append(fetch_texts(client, link))
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, list):
texts.extend(result)
else:
print(f'error: {result}')
return texts
async def fetch_texts(client, url):
texts = []
try:
response = await client.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
paragraphs = soup.find('div', class_='entry-content').find_all('p')
first_2_paragraphs = " ".join([paragraph.text.lower() for paragraph in paragraphs[:2]])
title = soup.find('h1', class_='entry-title').text.lower()
txt = f'{title} {first_2_paragraphs}'
cleaned_txt = clean_text(txt)
stopword_txt = stopword(cleaned_txt)
texts.append({
"title": title,
"paragraph": stopword_txt,
"url": url,
"nama_media": urlparse(url).netloc.split(".")[0]
})
except Exception as err:
print(f'error fetching {url}: {err}')
return texts
def clean_text(text):
text = re.sub(r'[^A-Za-z0-9 /]', '', text) #hapus nonaplhanumeric
text = re.sub(r'\s+', ' ', text).strip() #hapus spasi berlebihan
text = re.sub(r'\n', ' ', text) #hapus paragraf berlebihan
return text
def stopword(str_text):
str_text = STOP_WORDS_REMOVER_NEW.remove(str_text)
return str_text
def convert_english(text):
translator = Translator()
translation = translator.translate(text, dest='en', src='id').text
return translation
def sentiment_analysis(texts):
status = []
total_positif = total_negatif = total_netral = total = 0
for _, text in enumerate(texts):
analysis = TextBlob(text)
if analysis.sentiment.polarity > 0.0:
total_positif += 1
status.append('Positif')
elif analysis.sentiment.polarity == 0.0:
total_netral += 1
status.append('Netral')
else:
total_negatif += 1
status.append('Negatif')
total += 1
return {
"status": status,
"positif": total_positif,
"netral": total_netral,
"negatif": total_negatif,
"total": total
}
def generate_wordcloud(texts):
try:
# Generate word cloud
all_words = " ".join([text for text in texts])
wordcloud = WordCloud(
width=3000,
height=2000,
random_state=3,
background_color='white',
stopwords=STOPWORDS,
).generate(all_words)
# Create a unique filename
format_file = "svg"
filename = f"wordcloud.{format_file}"
file_path = os.path.join('./static/wordcloud/', filename)
# Save the word cloud to a file
plt.figure()
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.tight_layout(pad=0)
plt.savefig(file_path, format=format_file)
plt.close()
return file_path
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
def generate_csv(object):
df = pd.DataFrame(object)
filename = f"{uuid.uuid4()}.csv"
file_path = os.path.join('./static/csv/', filename)
df.to_csv(file_path)
return file_path