Spaces:
Sleeping
Sleeping
import tweepy | |
from transformers import pipeline, GPT2LMHeadModel, GPT2Tokenizer | |
import os | |
import streamlit as st | |
from datetime import datetime | |
import time | |
from tenacity import retry, stop_after_attempt, wait_exponential | |
import re | |
from collections import Counter | |
def debug_print(message): | |
"""Função para imprimir mensagens de debug tanto no console quanto no Streamlit""" | |
print(message) | |
st.text(message) | |
def fetch_tweets(client, query, tweet_fields): | |
try: | |
tweets = client.search_recent_tweets( | |
query=query, | |
max_results=10, # Reduzido para 10 tweets | |
tweet_fields=tweet_fields | |
) | |
if not hasattr(tweets, 'data') or tweets.data is None: | |
return None | |
return tweets | |
except Exception as e: | |
debug_print(f"Erro na busca: {str(e)}") | |
return None | |
def post_tweet(client, text): | |
try: | |
response = client.create_tweet(text=text) | |
return response | |
except Exception as e: | |
debug_print(f"Erro ao postar tweet: {str(e)}") | |
return None | |
def extract_context_from_tweets(tweets_data): | |
"""Versão simplificada da extração de contexto""" | |
all_text = " ".join([tweet.text for tweet in tweets_data]) | |
clean_text = re.sub(r'http\S+|@\S+|RT|[^\w\s]', ' ', all_text) | |
# Encontrar nomes capitalizados frequentes | |
words = clean_text.split() | |
capitalized_words = [word for word in words if word.istitle() and len(word) > 2] | |
participants = Counter(capitalized_words).most_common(3) # Reduzido para 3 | |
# Eventos simplificados | |
events = [] | |
event_keywords = ['paredão', 'prova', 'líder', 'eliminação', 'briga'] | |
for keyword in event_keywords: | |
if keyword in clean_text.lower(): | |
events.append(keyword) | |
return { | |
'participants': [p[0] for p in participants], | |
'events': events[:2], # Limitado a 2 eventos | |
'raw_text': clean_text | |
} | |
def generate_comment(context, sentiment_ratio, model, tokenizer): | |
"""Versão otimizada da geração de comentários""" | |
# Determinar o tom com base no sentimento | |
if sentiment_ratio['positive'] > 0.5: | |
tone = "clima animado" | |
elif sentiment_ratio['negative'] > 0.5: | |
tone = "clima tenso" | |
else: | |
tone = "opiniões divididas" | |
# Construir prompt base | |
prompt = f"BBB25 com {tone}" | |
# Adicionar contexto se disponível | |
if context['participants']: | |
prompt += f", {context['participants'][0]}" | |
if context['events']: | |
prompt += f", {context['events'][0]}" | |
# Gerar texto | |
try: | |
inputs = tokenizer.encode(prompt, return_tensors='pt', max_length=100, truncation=True) | |
outputs = model.generate( | |
inputs, | |
max_length=150, # Reduzido para melhor performance | |
num_return_sequences=1, # Apenas uma sequência | |
temperature=0.8, | |
top_k=40, | |
do_sample=True, | |
pad_token_id=tokenizer.eos_token_id | |
) | |
text = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
text = re.sub(r'\s+', ' ', text).strip() | |
# Adicionar hashtags | |
hashtags = " #BBB25" | |
if len(text) + len(hashtags) > 280: | |
text = text[:277-len(hashtags)] + "..." | |
return text + hashtags | |
except Exception as e: | |
debug_print(f"Erro na geração: {str(e)}") | |
return f"BBB25: {tone} hoje! #BBB25" # Fallback simples | |
def main(): | |
try: | |
st.title("Análise de Sentimentos - BBB25") | |
# Verificação simplificada de variáveis de ambiente | |
required_vars = [ | |
'TWITTER_API_KEY', | |
'TWITTER_API_SECRET_KEY', | |
'TWITTER_ACCESS_TOKEN', | |
'TWITTER_ACCESS_TOKEN_SECRET', | |
'TWITTER_BEARER_TOKEN' | |
] | |
if any(os.getenv(var) is None for var in required_vars): | |
raise ValueError("Faltam variáveis de ambiente necessárias") | |
# Autenticação Twitter | |
client = tweepy.Client( | |
bearer_token=os.getenv('TWITTER_BEARER_TOKEN'), | |
consumer_key=os.getenv('TWITTER_API_KEY'), | |
consumer_secret=os.getenv('TWITTER_API_SECRET_KEY'), | |
access_token=os.getenv('TWITTER_ACCESS_TOKEN'), | |
access_token_secret=os.getenv('TWITTER_ACCESS_TOKEN_SECRET'), | |
wait_on_rate_limit=True | |
) | |
# Inicializar modelo | |
model_name = "pierreguillou/gpt2-small-portuguese" | |
tokenizer = GPT2Tokenizer.from_pretrained(model_name) | |
model = GPT2LMHeadModel.from_pretrained(model_name) | |
# Buscar tweets | |
query = 'BBB25 lang:pt -is:retweet -is:reply' | |
tweet_fields = ['text', 'created_at'] | |
with st.spinner('Buscando tweets...'): | |
tweets = fetch_tweets(client, query, tweet_fields) | |
if tweets is None or not tweets.data: | |
st.error("Não foi possível obter tweets") | |
return | |
context = extract_context_from_tweets(tweets.data) | |
# Análise de sentimentos | |
with st.spinner('Analisando sentimentos...'): | |
sentiment_pipeline = pipeline( | |
"sentiment-analysis", | |
model="nlptown/bert-base-multilingual-uncased-sentiment" | |
) | |
sentiments = [] | |
for tweet in tweets.data[:10]: # Limitado a 10 tweets | |
result = sentiment_pipeline(tweet.text[:512]) # Limitado a 512 caracteres | |
rating = int(result[0]['label'].split()[0]) | |
if rating >= 4: | |
sentiments.append('positive') | |
elif rating <= 2: | |
sentiments.append('negative') | |
else: | |
sentiments.append('neutral') | |
# Calcular taxas | |
if sentiments: | |
total = len(sentiments) | |
sentiment_ratios = { | |
'positive': sentiments.count('positive') / total, | |
'negative': sentiments.count('negative') / total, | |
'neutral': sentiments.count('neutral') / total | |
} | |
# Gerar e postar tweet | |
with st.spinner('Gerando e postando tweet...'): | |
tweet_text = generate_comment(context, sentiment_ratios, model, tokenizer) | |
post_tweet(client, tweet_text) | |
# Interface | |
st.title("Resultados") | |
col1, col2, col3 = st.columns(3) | |
with col1: | |
st.metric("Positivo", f"{sentiment_ratios['positive']:.1%}") | |
with col2: | |
st.metric("Neutro", f"{sentiment_ratios['neutral']:.1%}") | |
with col3: | |
st.metric("Negativo", f"{sentiment_ratios['negative']:.1%}") | |
st.subheader("Tweet Gerado") | |
st.write(tweet_text) | |
# Log simplificado | |
with open('posting_log.txt', 'a') as f: | |
f.write(f"{datetime.now()}: {tweet_text}\n") | |
except Exception as e: | |
st.error(f"Erro: {str(e)}") | |
finally: | |
st.markdown("---") | |
st.markdown( | |
""" | |
<div style='text-align: center'> | |
<small>Desenvolvido com ❤️ usando Streamlit e Transformers</small> | |
</div> | |
""", | |
unsafe_allow_html=True | |
) | |
if __name__ == "__main__": | |
main() |