Spaces:
Sleeping
Sleeping
import os | |
from transformers import pipeline | |
from flask_cors import CORS | |
from flask import Flask, request, json | |
import requests | |
from bs4 import BeautifulSoup | |
from urllib.parse import urljoin | |
import google.generativeai as genai | |
from langchain_google_genai import GoogleGenerativeAIEmbeddings | |
from langchain_google_genai import ChatGoogleGenerativeAI | |
from dotenv import load_dotenv | |
from langchain_community.document_loaders import TextLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.vectorstores import Chroma | |
from langchain.chains.question_answering import load_qa_chain | |
from langchain import PromptTemplate | |
import pandas as pd | |
def load_model(): | |
api_key=os.getenv("GOOGLE_API_KEY") | |
genai.configure(api_key=api_key) | |
model = ChatGoogleGenerativeAI(model="gemini-pro", | |
temperature=0.3) | |
return model | |
def load_embeddings(): | |
embeddings = GoogleGenerativeAIEmbeddings(model = "models/embedding-001") | |
return embeddings | |
os.environ["CUDA_VISIBLE_DEVICES"] = "" | |
app = Flask(__name__) | |
cors = CORS(app) | |
load_dotenv() | |
pdf_model=load_model() | |
embeddings=load_embeddings() | |
# # Define the model and feature extractor globally | |
# model = AutoModelForImageClassification.from_pretrained('carbon225/vit-base-patch16-224-hentai') | |
# feature_extractor = AutoFeatureExtractor.from_pretrained('carbon225/vit-base-patch16-224-hentai') | |
def default(): | |
return json.dumps({"Server": "Working"}) | |
def extract_images(): | |
try: | |
src=request.args.get("src") | |
response = requests.get(src) | |
soup = BeautifulSoup(response.content,'html.parser') | |
img_urls=[] | |
img_tags = soup.select('div img') | |
for img_tag in img_tags: | |
img_url = urljoin(src, img_tag['src']) | |
img_urls.append(img_url) | |
return json.dumps({"images":img_urls}) | |
except Exception as e: | |
return e | |
api_key=os.getenv("GOOGLE_API_KEY") | |
genai.configure(api_key=api_key) | |
model=genai.GenerativeModel('gemini-pro') | |
sentiment_analysis = pipeline("sentiment-analysis",model="siebert/sentiment-roberta-large-english") | |
def sentiment(): | |
df=pd.read_excel('./tweets.xlsx') | |
reviews=df['text'][:100].tolist() | |
pos_count=0 | |
neg_count=0 | |
positive_reviews=[] | |
negative_reviews=[] | |
for i in range(len(reviews)): | |
if sentiment_analysis(reviews[i])[0]['label']=='POSITIVE': | |
positive_reviews.append(reviews[i]) | |
pos_count+=1 | |
else: | |
negative_reviews.append(reviews[i]) | |
neg_count+=1 | |
file_path = "negative_reviews.txt" | |
with open(file_path, "w") as txt_file: | |
for review in negative_reviews: | |
txt_file.write(review + "\n") | |
import matplotlib.pyplot as plt | |
activities=['positive','negative'] | |
slices=[pos_count,neg_count] | |
colors=['g','r'] | |
plt.pie(slices, labels = activities, colors=colors, | |
startangle=90, shadow = True, explode = (0, 0), | |
radius = 1.2, autopct = '%1.1f%%') | |
plt.legend() | |
plt.savefig('pie_chart.jpg', format='jpg') | |
return json.dumps({"message":1}) | |
# Getting key issues from customer feedback | |
def process_txt(): | |
loader = TextLoader("./negative_reviews.txt", encoding = 'UTF-8') | |
documents=loader.load() | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
texts = text_splitter.split_documents(documents) | |
persist_directory = 'db' | |
vectordb = Chroma.from_documents(documents=texts, | |
embedding=embeddings, | |
persist_directory=persist_directory) | |
vectordb = Chroma(persist_directory=persist_directory, | |
embedding_function=embeddings) | |
retriever = vectordb.as_retriever() | |
query="Suggest what are the key issues from the following customer tweets" | |
prompt_template=""" | |
Answer the question | |
Context:\n {context}?\n | |
Question:\n{question}?\n | |
Answer: | |
""" | |
prompt = PromptTemplate(template = prompt_template, input_variables = ["context", "question"]) | |
chain = load_qa_chain(pdf_model, chain_type="stuff", prompt=prompt) | |
docs = retriever.get_relevant_documents(query) | |
response = chain( | |
{"input_documents":docs, "question": query} | |
, return_only_outputs=True) | |
return json.dumps({"response":response['output_text']}) | |
# @app.route("/predict", methods=["GET"]) | |
# def predict(): | |
# try: | |
# src = request.args.get("src") | |
# # Download image from the provided URL | |
# response = requests.get(src) | |
# response.raise_for_status() | |
# # Open and preprocess the image | |
# image = Image.open(BytesIO(response.content)) | |
# image = image.resize((128, 128)) | |
# # Extract features using the pre-trained feature extractor | |
# encoding = feature_extractor(images=image.convert("RGB"), return_tensors="pt") | |
# # Make a prediction using the pre-trained model | |
# with torch.no_grad(): | |
# outputs = model(**encoding) | |
# logits = outputs.logits | |
# # Get the predicted class index and label | |
# predicted_class_idx = logits.argmax(-1).item() | |
# predicted_class_label = model.config.id2label[predicted_class_idx] | |
# # Return the predictions | |
# return json.dumps({"class": predicted_class_label}) | |
# except requests.exceptions.RequestException as e: | |
# return json.dumps({"error": f"Request error: {str(e)}"}) | |
# except Exception as e: | |
# return json.dumps({"error": f"An unexpected error occurred: {str(e)}"}) | |
def answer(): | |
query=request.get_json()['query'] | |
final_query=f""" | |
Following are negative reviews about my products, suggest what are the key issues from the customer feedback:{query} | |
""" | |
response = model.generate_content(final_query) | |
return json.dumps({"message":response.text}) | |
if __name__ == "__main__": | |
app.run(debug=True) |