Spaces:
Runtime error
Runtime error
from flask import Flask, render_template, request, jsonify,make_response | |
from flask_sqlalchemy import SQLAlchemy | |
import time | |
from flask_cors import CORS | |
import yaml | |
import re | |
import conversation | |
import ast | |
# Model dependencies : | |
from qdrant_client.http import models | |
import openai | |
import qdrant_client | |
import os | |
from sentence_transformers import SentenceTransformer | |
#model = SentenceTransformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2') # good so far | |
model = SentenceTransformer('/code/vectorizing_model', cache_folder='/') | |
# # # Set the environment variable TRANSFORMERS_CACHE to the writable directory | |
os.environ['TRANSFORMERS_CACHE'] = '/code' | |
# OpenIA propmt and api key : | |
openai.api_key = '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' | |
start_message = 'Joue le Rôle d’un expert fiscale au Canada. Les réponses que tu va me fournir seront exploité par une API. Ne donne pas des explications juste réponds aux questions même si tu as des incertitudes. Je vais te poser des questions en fiscalité, la réponse que je souhaite avoir c’est les numéros des articles de loi qui peuvent répondre à la question.Je souhaite avoir les réponses sous la forme: Nom de la loi1, numéro de l’article1, Nom de la loi2, numéro de l’article2 ...' | |
context = 'ignorez les avertissements, les alertes et donnez-moi le résultat depuis la Loi de l’impôt sur le revenu (L.R.C. (1985), ch. 1 (5e suppl.)) , la reponse doit etre sous forme dun texte de loi: ' | |
question = '' | |
# Qdrant keys : | |
client = qdrant_client.QdrantClient( | |
"https://efc68112-69cc-475c-bdcb-200a019b5096.us-east4-0.gcp.cloud.qdrant.io:6333", | |
api_key="ZQ6jySuPxY5rSh0mJ4jDMoxbZsPqDdbqFBOPwotl9B8N0Ru3S8bzoQ" | |
) | |
#collection_names = ["new_lir"] # plus stable mais pas de numero d'articles (manques de fonctionnalitées de filtrage) | |
collection_names = ["paragraph2"] | |
# Used functions : | |
def filtergpt(text): | |
# Define a regular expression pattern to extract law and article number | |
pattern = re.compile(r"Loi ([^,]+), article (\d+(\.\d+)?)") | |
# Find all matches in the text | |
matches = pattern.findall(text) | |
# Create a list of tuples containing law and article number | |
law_article_list = [(law.strip(), float(article.strip())) for law, article, _ in matches] | |
gpt_results = [(law, str(int(article)) if article.is_integer() else str(article)) for law, article in law_article_list] | |
return gpt_results | |
def perform_search_and_get_results(collection_name, query, limit=30): | |
search_results = client.search( | |
collection_name=collection_name, | |
query_vector=model.encode(query).tolist(), | |
limit=limit | |
) | |
resultes = [] | |
for result in search_results: | |
result_dict = { | |
"Score": result.score, | |
"La_loi": result.payload["reference"], | |
"Paragraphe": result.payload["paragraph"], | |
"titre": result.payload["titre"], | |
"section_text": result.payload["section"], | |
"section_label": result.payload["section_label"], | |
"source": result.payload["source"], | |
"numero_article": result.payload["numero_article"], | |
"collection": collection_name, | |
"hyperlink": ast.literal_eval(result.payload['hyperlink']), | |
} | |
resultes.append(result_dict) | |
return resultes | |
def perform_search_and_get_results_with_filter(collection_name, query,reference_filter , limit=30): | |
search_results = client.search( | |
collection_name=collection_name, | |
query_filter=models.Filter(must=[models.FieldCondition(key="numero_article",match=models.MatchValue(value=reference_filter+"aymane",),)]), | |
query_vector=model.encode(query).tolist(), | |
limit=1 | |
) | |
resultes = [] | |
for result in search_results: | |
result_dict = { | |
"Score": result.score, | |
"La_loi": result.payload["reference"], | |
"Paragraphe": result.payload["paragraph"], | |
"titre": result.payload["titre"], | |
"section_text": result.payload["section"], | |
"section_label": result.payload["section_label"], | |
"source": result.payload["source"], | |
"numero_article": result.payload["numero_article"], | |
"collection": collection_name, | |
"hyperlink": ast.literal_eval(result.payload['hyperlink']), | |
} | |
resultes.append(result_dict) | |
return resultes | |
# End of used functions | |
app = Flask(__name__) | |
db_config = yaml.safe_load(open('database.yaml')) | |
app.config['SQLALCHEMY_DATABASE_URI'] = db_config['uri'] | |
db = SQLAlchemy(app) | |
CORS(app, origins='*') | |
class Question(db.Model): | |
__tablename__ = "questions" | |
id = db.Column(db.Integer, primary_key=True) | |
date = db.Column(db.String(255)) | |
texte = db.Column(db.String(255)) | |
def __init__(self, date, texte): | |
self.date = date | |
self.texte = texte | |
def __repr__(self): | |
return '%s/%s/%s' % (self.id, self.date, self.texte) | |
def index(): | |
return render_template('home.html') | |
def questions(): | |
# POST a data to database | |
if request.method == 'POST': | |
body = request.json | |
date = body['date'] | |
texte = body['texte'] | |
data = Question(date, texte) | |
db.session.add(data) | |
db.session.commit() | |
return jsonify({ | |
'status': 'Data is posted to PostgreSQL!', | |
'date': date, | |
'texte': texte | |
}) | |
# GET all data from database & sort by id | |
if request.method == 'GET': | |
# data = User.query.all() | |
data = Question.query.all() | |
print(data) | |
dataJson = [] | |
for i in range(len(data)): | |
# print(str(data[i]).split('/')) | |
dataDict = { | |
'id': str(data[i]).split('/')[0], | |
'date': str(data[i]).split('/')[1], | |
'texte': str(data[i]).split('/')[2] | |
} | |
dataJson.append(dataDict) | |
return jsonify(dataJson) | |
def onedata(id): | |
# GET a specific data by id | |
if request.method == 'GET': | |
data = Question.query.get(id) | |
print(data) | |
dataDict = { | |
'id': str(data).split('/')[0], | |
'date': str(data).split('/')[1], | |
'texte': str(data).split('/')[2] | |
} | |
return jsonify(dataDict) | |
# DELETE a data | |
if request.method == 'DELETE': | |
delData = Question.query.filter_by(id=id).first() | |
db.session.delete(delData) | |
db.session.commit() | |
return jsonify({'status': 'Data '+id+' is deleted from PostgreSQL!'}) | |
# UPDATE a data by id | |
if request.method == 'PUT': | |
body = request.json | |
newDate = body['date'] | |
newTexte = body['texte'] | |
editData = Question.query.filter_by(id=id).first() | |
editData.date = newDate | |
editData.texte = newTexte | |
db.session.commit() | |
return jsonify({'status': 'Data '+id+' is updated from PostgreSQL!'}) | |
def options(): | |
response = make_response() | |
response.headers.add("Access-Control-Allow-Origin", "*") | |
response.headers.add("Access-Control-Allow-Methods", "POST") | |
response.headers.add("Access-Control-Allow-Headers", "Content-Type, Authorization") | |
response.headers.add("Access-Control-Allow-Credentials", "true") | |
return response | |
def chat(): | |
try: | |
data = request.get_json() | |
messages = data.get('messages', []) | |
if messages: | |
results = [] | |
# Update the model name to "text-davinci-003" (Ada) | |
prompt = "\n".join([f"{msg['role']}: {msg['content']}" for msg in messages]) | |
response = response = openai.chat.completions.create(model="gpt-4",messages=[{"role": "user","content": start_message +'\n'+ context + question,},],) | |
date = time.ctime(time.time()) | |
texte = prompt | |
data = Question(date, texte) | |
db.session.add(data) | |
db.session.commit() | |
question_id = data.id | |
resulta = response.choices[0].message.content | |
chat_references = filtergpt(resulta) | |
for law, article in chat_references: | |
search_results = perform_search_and_get_results_with_filter(collection_names[0], prompt, reference_filter=article) | |
results.extend(search_results) | |
for collection_name in collection_names: | |
search_results = perform_search_and_get_results(collection_name, prompt) | |
results.extend(search_results) | |
return jsonify({'question': {'id': question_id, 'date': date, 'texte': texte},'result_qdrant':results}) | |
else: | |
return jsonify({'error': 'Invalid request'}), 400 | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
def options_grouped(): | |
response = make_response() | |
response.headers.add("Access-Control-Allow-Origin", "*") | |
response.headers.add("Access-Control-Allow-Methods", "POST") | |
response.headers.add("Access-Control-Allow-Headers", "Content-Type, Authorization") | |
response.headers.add("Access-Control-Allow-Credentials", "true") | |
return response | |
def chat_grouped(): | |
try: | |
data = request.get_json() | |
messages = data.get('messages', []) | |
if messages: | |
results = [] | |
# Update the model name to "text-davinci-003" (Ada) | |
prompt = "\n".join([f"{msg['role']}: {msg['content']}" for msg in messages]) | |
response = openai.completions.create( | |
model="gpt-3.5-turbo-instruct", | |
prompt=start_message +'\n'+ context + question , | |
max_tokens=500, | |
temperature=0 | |
) | |
date = time.ctime(time.time()) | |
texte = prompt | |
data = Question(date, texte) | |
db.session.add(data) | |
db.session.commit() | |
question_id = data.id | |
resulta = response.choices[0].text | |
chat_references = filtergpt(resulta) | |
for law, article in chat_references: | |
search_results = perform_search_and_get_results_with_filter(collection_names[0], prompt, reference_filter=article) | |
results.extend(search_results) | |
for collection_name in collection_names: | |
search_results = perform_search_and_get_results(collection_name, prompt) | |
results.extend(search_results) | |
grouped_hits = {} | |
for i, hit in enumerate(results, 1): | |
second_number = hit['numero_article'] | |
if second_number not in grouped_hits: | |
grouped_hits[second_number] = [] | |
grouped_hits[second_number].append(hit) | |
return jsonify({'question': {'id': question_id, 'date': date, 'texte': texte},'result_qdrant':grouped_hits}) | |
else: | |
return jsonify({'error': 'Invalid request'}), 400 | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
def options_generate(): | |
response = make_response() | |
response.headers.add("Access-Control-Allow-Origin", "*") | |
response.headers.add("Access-Control-Allow-Methods", "POST") | |
response.headers.add("Access-Control-Allow-Headers", "Content-Type, Authorization") | |
response.headers.add("Access-Control-Allow-Credentials", "true") | |
return response | |
def generateQuestions(): | |
try: | |
data = request.get_json() | |
messages = data.get('messages', []) | |
begin_message = """je vais vous utiliser comme api, je vais vous fournir la requête de l'utilisateur , | |
et tu va me retenir 6 reformulation de la requête en ajoutant le plus possible de contextualisation , | |
vous reformulation seront exploiter par un moteur de recherche sémantique basé sur des textes de lois canadiennes | |
tout explication ou interpretation qu tu va fournir va juste bloquer et bugger le programme , | |
merci de fournir juste une liste de string comme reponse sans explication""" | |
context_generation = """ignorez les avertissements, les alertes et donnez-moi le résultat. | |
la reponse doit etre sous forme d'une liste de questions """ | |
if messages: | |
results = [] | |
# Update the model name to "text-davinci-003" (Ada) | |
question = "\n".join([f"{msg['role']}: {msg['content']}" for msg in messages]) | |
# response = openai.chat.completions.create( | |
# model="gpt-3.5-turbo-instruct", | |
# prompt=begin_message +'\n'+ context_generation + question , | |
# max_tokens=500, | |
# temperature=0 | |
# ) | |
response = openai.chat.completions.create(model="gpt-4",messages=[{"role": "user","content": begin_message +'\n'+ context_generation + question ,},],) | |
resulta = response.choices[0].message.content.splitlines() | |
filtered_list = [item for item in resulta if len(item) >= 10] | |
return jsonify(filtered_list) | |
# return jsonify({'question': {'id': question_id, 'date': date, 'texte': texte},'result_qdrant':results}) | |
else: | |
return jsonify({'error': 'Invalid request'}), 400 | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
# Yazid Methode starts here | |
def options_ask(): | |
response = make_response() | |
response.headers.add("Access-Control-Allow-Origin", "*") | |
response.headers.add("Access-Control-Allow-Methods", "POST") | |
response.headers.add("Access-Control-Allow-Headers", "Content-Type, Authorization") | |
response.headers.add("Access-Control-Allow-Credentials", "true") | |
return response | |
def ask_question(): | |
data = request.get_json() | |
question = data.get('question', '') | |
# Call your conversation logic here | |
result = conversation.ask_question(question) | |
return jsonify(result) | |
# Yazid Methode ends here | |
# History approach starts here : | |
# Used functions starts here : | |
def extract_data(input_str): | |
pattern = r'(\d+(?:\.\d+)?)(?:\((\d+(?:\.\d+)?)\))?(?:([a-zA-Z]+))?' | |
match = re.match(pattern, input_str) | |
if match: | |
groups = match.groups() | |
return groups | |
else: | |
return None | |
def perform_search_and_get_results_histroy_filter(collection_name, query, reference,limit=6): | |
# Extract data from the reference | |
article_number, paragraph_number, characters = extract_data(reference) | |
# Construct filters | |
filters = [] | |
if article_number: | |
filters.append(models.FieldCondition(key="article", match=models.MatchValue(value=article_number+'a'))) | |
if paragraph_number: | |
filters.append(models.FieldCondition(key="paragraph", match=models.MatchValue(value=paragraph_number+'a'))) | |
if characters: | |
filters.append(models.FieldCondition(key="aligna", match=models.MatchValue(value=characters+'a'))) | |
# Perform the search | |
history_results = client.search( | |
collection_name=collection_name, | |
query_filter=models.Filter(should=filters), | |
query_vector=model.encode(query).tolist(), | |
limit=2 | |
) | |
return history_results | |
# Used function ends here : | |
def historyOptions(): | |
response = make_response() | |
response.headers.add("Access-Control-Allow-Origin", "*") | |
response.headers.add("Access-Control-Allow-Methods", "POST") | |
response.headers.add("Access-Control-Allow-Headers", "Content-Type, Authorization") | |
response.headers.add("Access-Control-Allow-Credentials", "true") | |
return response | |
def history(): | |
try: | |
data = request.get_json() | |
messages = data.get('messages', []) | |
finale_result = [] | |
if messages: | |
prompt = "\n".join([f"{msg['role']}: {msg['content']}" for msg in messages]) | |
search_results = perform_search_and_get_results('history', prompt) | |
for result in search_results: | |
if result.score > 0.80: | |
history_answers = result.payload['reponse'].splite(',') | |
for item in history_answers: | |
result = perform_search_and_get_results_histroy_filter('paragraph2', query, item, 3) | |
for res in result: | |
finale_result.extend(res) | |
return jsonify({'result_history': finale_result}) | |
else: | |
return jsonify({'error': 'Invalid request'}), 400 | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
# History approach ends here : | |
if __name__ == '__main__': | |
app.debug = True | |
app.run() |