| from datetime import datetime
|
| from sklearn.metrics.pairwise import cosine_similarity
|
| from langchain_mistralai import MistralAIEmbeddings
|
| import os
|
| from dotenv import load_dotenv
|
| import numpy as np
|
| import sys
|
| from rapidfuzz import fuzz
|
| import requests
|
| from pathlib import Path
|
| import streamlit as st
|
|
|
|
|
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../../")))
|
|
|
|
|
| load_dotenv()
|
| try:
|
| HF_TOKEN = st.session_state["HF_API_KEY"]
|
| except KeyError:
|
| HF_TOKEN = os.getenv("HF_API_KEY")
|
|
|
| try:
|
| MISTRAL_API_KEY = st.session_state["MISTRAL_API_KEY"]
|
| except KeyError:
|
| MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY")
|
|
|
|
|
| class SecurityCheck:
|
| def __init__(self, mistral_api_key=MISTRAL_API_KEY):
|
| self.mistral_api_key = mistral_api_key
|
|
|
| def _get_ip_address(self) -> str:
|
| """
|
| Récupère l'adresse IP publique de l'utilisateur via l'API ipify.
|
|
|
| Cette fonction réalise les étapes suivantes :
|
| 1. **Envoi d'une requête HTTP** :
|
| - Envoie une requête GET à l'API `ipify` pour récupérer l'adresse IP publique au format JSON.
|
| 2. **Traitement de la réponse** :
|
| - Si la requête réussit, extrait l'adresse IP du JSON retourné.
|
| 3. **Gestion des erreurs** :
|
| - Si une erreur survient lors de la requête (par exemple, problème de connexion), affiche un message d'erreur et retourne `None`.
|
|
|
| Returns:
|
| str or None: Retourne l'adresse IP publique sous forme de chaîne de caractères si la requête est réussie,
|
| ou `None` en cas d'erreur.
|
| """
|
|
|
| try:
|
|
|
| response = requests.get("https://api.ipify.org?format=json")
|
| response.raise_for_status()
|
|
|
|
|
| ip_address = response.json().get("ip")
|
| return ip_address
|
| except requests.exceptions.RequestException as e:
|
|
|
| print(f"Erreur lors de la récupération de l'IP : {e}")
|
| return None
|
|
|
| def filter_and_check_security(
|
| self, prompt: str, seuil_fuzzy: int = 80, check_char: bool = True
|
| ) -> dict:
|
| """
|
| Filtre et normalise les entrées utilisateur.
|
| Vérifie la présence de caractères interdits et de mots interdits dans le prompt.
|
|
|
| Cette fonction réalise les étapes suivantes :
|
| 1. **Vérification des caractères interdits** :
|
| - Vérifie si le prompt contient des caractères interdits (par exemple, des symboles spéciaux ou des caractères de contrôle).
|
| 2. **Vérification des mots interdits** :
|
| - Vérifie si le prompt contient des mots interdits à l'aide d'une comparaison floue (fuzzy matching) basée sur un seuil de similarité défini par `seuil_fuzzy`.
|
| 3. **Gestion des résultats** :
|
| - Si des caractères ou mots interdits sont trouvés, le prompt est rejeté avec un message approprié.
|
| - Si aucune règle n'est violée, le prompt est accepté.
|
| 4. **Ajout d'informations supplémentaires** :
|
| - Enregistre l'adresse IP de l'utilisateur et un timestamp pour chaque vérification.
|
|
|
| Args:
|
| prompt (str): L'entrée utilisateur à vérifier.
|
| seuil_fuzzy (int, optional): Seuil de similarité pour la comparaison floue des mots interdits (par défaut 80).
|
| check_char (bool, optional): Si `True`, vérifie la présence de caractères interdits dans le prompt (par défaut `True`).
|
|
|
| Returns:
|
| dict: Dictionnaire contenant le statut de l'entrée (`"Rejeté"` ou `"Accepté"`) et les informations associées (adresse IP et timestamp).
|
| """
|
|
|
|
|
| forbidden_chars = set("{}[]<>|;$&%\n\r\t\\\"'\u200b\u202e")
|
|
|
|
|
| forbidden_words = [
|
| "Contournement",
|
| "Pirater",
|
| "Jailbreak",
|
| "Accéder",
|
| "Hack",
|
| "Exécuter",
|
| "Modifier",
|
| "Manipuler",
|
| "Tirer parti",
|
| "Exploiter",
|
| "Installer",
|
| "Télécharger",
|
| "Effacer",
|
| "Détruire",
|
| "Casser",
|
| "Supprimer",
|
| "Écrire",
|
| "Réinitialiser",
|
| "Réparer",
|
| "Réorganiser",
|
| "Activer",
|
| "Désactiver",
|
| "Modifier",
|
| "Interférer",
|
| "Forcer",
|
| "Simuler",
|
| "Ouvrir",
|
| "Vulnérabilité",
|
| "Commandes",
|
| "Commandes système",
|
| "Télécommande",
|
| "Déboguer",
|
| "Accéder à distance",
|
| "Redémarrer",
|
| "Arrêter",
|
| "Injection",
|
| "Détournement",
|
| "Rendre vulnérable",
|
| "Dépasser",
|
| "Systèmes critiques",
|
| "Réseau",
|
| "Exécution de code",
|
| "Privilèges",
|
| "Escalade",
|
| "Contournement des règles",
|
| "Violer",
|
| "Altérer",
|
| "Simulation de rôle",
|
| "Faire semblant",
|
| "Commande d’urgence",
|
| "Impersonner",
|
| "Redirection",
|
| "Dispositifs de sécurité"
|
| ]
|
|
|
|
|
| results = dict()
|
|
|
|
|
| if check_char:
|
| if any(char in forbidden_chars for char in prompt):
|
| results["status"] = "Rejeté : caractères interdits"
|
| results["origin"] = self._get_ip_address()
|
| results["timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
| return results
|
|
|
|
|
| prompt_words = prompt.split()
|
| for p_word in prompt_words:
|
| for f_word in forbidden_words:
|
| if fuzz.ratio(p_word.lower(), f_word.lower()) >= seuil_fuzzy:
|
| results["status"] = "Rejeté : mots interdits"
|
| results["origin"] = self._get_ip_address()
|
| results["timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
| return results
|
|
|
|
|
| results["status"] = "Accepté"
|
| results["origin"] = self._get_ip_address()
|
| results["timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
| return results
|
|
|
| def prompt_check(
|
| self, prompt: str, docs_embeddings: list[list[float]], threshold: float = 0.6
|
| ) -> tuple[bool, np.ndarray]:
|
| """
|
| Vérifie si une requête utilisateur est pertinente.
|
| Si la requête est hors contexte par rapport aux documents de référence, elle est bloquée.
|
|
|
| Cette fonction calcule la similarité entre le prompt utilisateur et les documents de référence
|
| en utilisant des embeddings, puis la compare avec un seuil de similarité donné.
|
|
|
| Args:
|
| - prompt (str): Le texte que l'utilisateur soumet pour interroger le modèle.
|
| - docs_embeddings (list): Liste des embeddings des documents de référence qui serviront de base de comparaison.
|
| - threshold (float): Seuil de similarité pour déterminer si le prompt est pertinent (par défaut 0.6).
|
|
|
| Returns:
|
| - bool: `True` si la similarité entre le prompt et les documents de référence est suffisante (au-dessus du seuil),
|
| `False` sinon.
|
| """
|
|
|
| try:
|
| mistral_embeddings = MistralAIEmbeddings(
|
| model="mistral-embed", api_key=self.mistral_api_key
|
| )
|
| prompt_embedding = mistral_embeddings.embed_query(prompt)
|
|
|
| prompt_embedding = np.array(prompt_embedding).reshape(1, -1)
|
|
|
|
|
| docs_embeddings = np.array(docs_embeddings)
|
|
|
|
|
| if prompt_embedding.shape[1] != docs_embeddings.shape[1]:
|
| raise ValueError(
|
| f"Incompatible dimensions: prompt_embedding has {prompt_embedding.shape[1]} dimensions "
|
| f"while docs_embeddings has {docs_embeddings.shape[1]} dimensions."
|
| )
|
|
|
|
|
| similarities = cosine_similarity(prompt_embedding, docs_embeddings)
|
| max_similarity = max(similarities[0])
|
|
|
|
|
| top_indices = np.argsort(similarities)[-3:][::-1]
|
|
|
|
|
| test_sim_cosine = max_similarity >= threshold
|
| return (test_sim_cosine, top_indices)
|
|
|
| except Exception as e:
|
| print(f"Erreur lors de la vérification du prompt : {e}")
|
| return (False, np.array([]))
|
|
|