Spaces:
Running
Running
param-bharat
commited on
Commit
•
f4dbf56
1
Parent(s):
08d6b95
feat: upgrade and improve secrets detection
Browse files- guardrails_genie/guardrails/secrets_detection/__init__.py +2 -4
- guardrails_genie/guardrails/secrets_detection/secrets_detection.py +350 -86
- guardrails_genie/guardrails/secrets_detection/secrets_patterns.jsonl +0 -0
- pyproject.toml +4 -0
- tests/guardrails_genie/guardrails/test_secrets_detection.py +21 -29
guardrails_genie/guardrails/secrets_detection/__init__.py
CHANGED
@@ -1,17 +1,15 @@
|
|
1 |
from guardrails_genie.guardrails.secrets_detection.secrets_detection import (
|
2 |
-
DEFAULT_SECRETS_PATTERNS,
|
3 |
SecretsDetectionGuardrail,
|
4 |
SecretsDetectionSimpleResponse,
|
5 |
SecretsDetectionResponse,
|
6 |
REDACTION,
|
7 |
-
|
8 |
)
|
9 |
|
10 |
__all__ = [
|
11 |
-
"DEFAULT_SECRETS_PATTERNS",
|
12 |
"SecretsDetectionGuardrail",
|
13 |
"SecretsDetectionSimpleResponse",
|
14 |
"SecretsDetectionResponse",
|
15 |
"REDACTION",
|
16 |
-
"
|
17 |
]
|
|
|
1 |
from guardrails_genie.guardrails.secrets_detection.secrets_detection import (
|
|
|
2 |
SecretsDetectionGuardrail,
|
3 |
SecretsDetectionSimpleResponse,
|
4 |
SecretsDetectionResponse,
|
5 |
REDACTION,
|
6 |
+
redact_value,
|
7 |
)
|
8 |
|
9 |
__all__ = [
|
|
|
10 |
"SecretsDetectionGuardrail",
|
11 |
"SecretsDetectionSimpleResponse",
|
12 |
"SecretsDetectionResponse",
|
13 |
"REDACTION",
|
14 |
+
"redact_value",
|
15 |
]
|
guardrails_genie/guardrails/secrets_detection/secrets_detection.py
CHANGED
@@ -1,41 +1,30 @@
|
|
1 |
import hashlib
|
2 |
import json
|
|
|
3 |
import pathlib
|
|
|
4 |
from enum import Enum
|
5 |
-
from typing import
|
6 |
|
7 |
import weave
|
8 |
-
from pydantic import BaseModel
|
9 |
|
10 |
from guardrails_genie.guardrails.base import Guardrail
|
11 |
-
from guardrails_genie.regex_model import RegexModel
|
12 |
|
13 |
-
|
14 |
-
|
15 |
-
|
16 |
-
|
17 |
-
|
18 |
-
|
19 |
-
|
20 |
-
|
21 |
-
|
22 |
-
patterns = (
|
23 |
-
pathlib.Path(__file__).parent.absolute() / "secrets_patterns.jsonl"
|
24 |
-
).read_text()
|
25 |
-
|
26 |
-
for pattern in patterns.splitlines():
|
27 |
-
pattern = json.loads(pattern)
|
28 |
-
default_patterns[pattern["name"]] = [rf"{pat}" for pat in pattern["patterns"]]
|
29 |
-
return default_patterns
|
30 |
-
|
31 |
-
|
32 |
-
# Load default secret patterns from the JSONL file
|
33 |
-
DEFAULT_SECRETS_PATTERNS = load_secrets_patterns()
|
34 |
|
35 |
|
36 |
class REDACTION(str, Enum):
|
37 |
"""
|
38 |
-
Enum for different types of redaction
|
39 |
"""
|
40 |
|
41 |
REDACT_PARTIAL = "REDACT_PARTIAL"
|
@@ -44,31 +33,31 @@ class REDACTION(str, Enum):
|
|
44 |
REDACT_NONE = "REDACT_NONE"
|
45 |
|
46 |
|
47 |
-
def
|
48 |
"""
|
49 |
-
|
50 |
|
51 |
Args:
|
52 |
-
|
53 |
-
|
54 |
-
|
|
|
|
|
|
|
55 |
|
56 |
Returns:
|
57 |
-
str: The redacted
|
58 |
-
"""
|
59 |
-
|
60 |
-
|
61 |
-
|
62 |
-
|
63 |
-
|
64 |
-
|
65 |
-
|
66 |
-
|
67 |
-
|
68 |
-
|
69 |
-
replacement = match
|
70 |
-
text = text.replace(match, replacement)
|
71 |
-
return text
|
72 |
|
73 |
|
74 |
class SecretsDetectionSimpleResponse(BaseModel):
|
@@ -79,11 +68,13 @@ class SecretsDetectionSimpleResponse(BaseModel):
|
|
79 |
contains_secrets (bool): Indicates if secrets were detected.
|
80 |
explanation (str): Explanation of the detection result.
|
81 |
redacted_text (Optional[str]): The redacted text if secrets were found.
|
|
|
82 |
"""
|
83 |
|
84 |
contains_secrets: bool
|
85 |
explanation: str
|
86 |
redacted_text: Optional[str] = None
|
|
|
87 |
|
88 |
@property
|
89 |
def safe(self) -> bool:
|
@@ -104,54 +95,329 @@ class SecretsDetectionResponse(SecretsDetectionSimpleResponse):
|
|
104 |
detected_secrets (dict[str, list[str]]): Dictionary of detected secrets.
|
105 |
"""
|
106 |
|
107 |
-
detected_secrets: dict[str,
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
108 |
|
109 |
|
110 |
class SecretsDetectionGuardrail(Guardrail):
|
111 |
"""
|
112 |
-
|
113 |
-
reference: SecretBench: A Dataset of Software Secrets
|
114 |
-
https://arxiv.org/abs/2303.06729
|
115 |
|
116 |
Attributes:
|
117 |
-
|
118 |
-
|
119 |
-
|
120 |
"""
|
121 |
|
122 |
-
regex_model: RegexModel
|
123 |
-
patterns: Union[dict[str, str], dict[str, list[str]]] = {}
|
124 |
redaction: REDACTION
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
125 |
|
126 |
def __init__(
|
127 |
self,
|
128 |
-
use_defaults: bool = True,
|
129 |
redaction: REDACTION = REDACTION.REDACT_ALL,
|
130 |
**kwargs,
|
131 |
):
|
132 |
"""
|
133 |
-
|
134 |
|
135 |
Args:
|
136 |
-
|
137 |
-
redaction (REDACTION): The type of redaction to apply.
|
138 |
**kwargs: Additional keyword arguments.
|
139 |
"""
|
140 |
-
patterns = {}
|
141 |
-
if use_defaults:
|
142 |
-
patterns = DEFAULT_SECRETS_PATTERNS.copy()
|
143 |
-
if kwargs.get("patterns"):
|
144 |
-
patterns.update(kwargs["patterns"])
|
145 |
-
|
146 |
-
regex_model = RegexModel(patterns=patterns)
|
147 |
-
|
148 |
super().__init__(
|
149 |
-
regex_model=regex_model,
|
150 |
-
patterns=patterns,
|
151 |
redaction=redaction,
|
152 |
)
|
153 |
|
154 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
155 |
def guard(
|
156 |
self,
|
157 |
prompt: str,
|
@@ -159,40 +425,38 @@ class SecretsDetectionGuardrail(Guardrail):
|
|
159 |
**kwargs,
|
160 |
) -> SecretsDetectionResponse | SecretsDetectionResponse:
|
161 |
"""
|
162 |
-
|
163 |
|
164 |
Args:
|
165 |
-
prompt (str):
|
166 |
-
return_detected_secrets (bool):
|
|
|
167 |
|
168 |
Returns:
|
169 |
-
SecretsDetectionResponse
|
170 |
"""
|
171 |
-
|
172 |
|
173 |
explanation_parts = []
|
174 |
-
if
|
175 |
explanation_parts.append("Found the following secrets in the text:")
|
176 |
-
for secret_type, matches in
|
177 |
explanation_parts.append(f"- {secret_type}: {len(matches)} instance(s)")
|
178 |
else:
|
179 |
explanation_parts.append("No secrets detected in the text.")
|
180 |
|
181 |
-
redacted_text = prompt
|
182 |
-
if result.matched_patterns:
|
183 |
-
for secret_type, matches in result.matched_patterns.items():
|
184 |
-
redacted_text = redact(redacted_text, matches, self.redaction)
|
185 |
-
|
186 |
if return_detected_secrets:
|
187 |
return SecretsDetectionResponse(
|
188 |
-
contains_secrets=
|
189 |
-
detected_secrets=
|
190 |
explanation="\n".join(explanation_parts),
|
191 |
-
redacted_text=
|
|
|
192 |
)
|
193 |
else:
|
194 |
return SecretsDetectionSimpleResponse(
|
195 |
-
contains_secrets=not
|
196 |
explanation="\n".join(explanation_parts),
|
197 |
-
redacted_text=
|
|
|
198 |
)
|
|
|
1 |
import hashlib
|
2 |
import json
|
3 |
+
import os
|
4 |
import pathlib
|
5 |
+
import tempfile
|
6 |
from enum import Enum
|
7 |
+
from typing import Optional, Any
|
8 |
|
9 |
import weave
|
10 |
+
from pydantic import BaseModel, PrivateAttr
|
11 |
|
12 |
from guardrails_genie.guardrails.base import Guardrail
|
|
|
13 |
|
14 |
+
try:
|
15 |
+
from detect_secrets import SecretsCollection
|
16 |
+
from detect_secrets.settings import default_settings
|
17 |
+
import hyperscan
|
18 |
+
except ImportError:
|
19 |
+
raise ImportError(
|
20 |
+
"The `detect-secrets` and the `hyperscan` packages are required for using the SecretsGuardrail. "
|
21 |
+
"Please install then by running `pip install detect-secrets hyperscan`."
|
22 |
+
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
23 |
|
24 |
|
25 |
class REDACTION(str, Enum):
|
26 |
"""
|
27 |
+
Enum for different types of redaction modes.
|
28 |
"""
|
29 |
|
30 |
REDACT_PARTIAL = "REDACT_PARTIAL"
|
|
|
33 |
REDACT_NONE = "REDACT_NONE"
|
34 |
|
35 |
|
36 |
+
def redact_value(value: str, mode: str) -> str:
|
37 |
"""
|
38 |
+
Redacts the given value based on the specified redaction mode.
|
39 |
|
40 |
Args:
|
41 |
+
value (str): The string value to be redacted.
|
42 |
+
mode (str): The redaction mode to be applied. It can be one of the following:
|
43 |
+
- REDACTION.REDACT_PARTIAL: Partially redacts the value.
|
44 |
+
- REDACTION.REDACT_ALL: Fully redacts the value.
|
45 |
+
- REDACTION.REDACT_HASH: Redacts the value by hashing it.
|
46 |
+
- REDACTION.REDACT_NONE: No redaction is applied.
|
47 |
|
48 |
Returns:
|
49 |
+
str: The redacted value based on the specified mode.
|
50 |
+
"""
|
51 |
+
replacement = value
|
52 |
+
if mode == REDACTION.REDACT_PARTIAL:
|
53 |
+
replacement = "[REDACTED:]" + value[:2] + ".." + value[-2:] + "[:REDACTED]"
|
54 |
+
elif mode == REDACTION.REDACT_ALL:
|
55 |
+
replacement = "[REDACTED:]" + ("*" * len(value)) + "[:REDACTED]"
|
56 |
+
elif mode == REDACTION.REDACT_HASH:
|
57 |
+
replacement = (
|
58 |
+
"[REDACTED:]" + hashlib.md5(value.encode()).hexdigest() + "[:REDACTED]"
|
59 |
+
)
|
60 |
+
return replacement
|
|
|
|
|
|
|
61 |
|
62 |
|
63 |
class SecretsDetectionSimpleResponse(BaseModel):
|
|
|
68 |
contains_secrets (bool): Indicates if secrets were detected.
|
69 |
explanation (str): Explanation of the detection result.
|
70 |
redacted_text (Optional[str]): The redacted text if secrets were found.
|
71 |
+
risk_score (float): The risk score of the detection result. (0.0, 0.5, 1.0)
|
72 |
"""
|
73 |
|
74 |
contains_secrets: bool
|
75 |
explanation: str
|
76 |
redacted_text: Optional[str] = None
|
77 |
+
risk_score: float = 0.0
|
78 |
|
79 |
@property
|
80 |
def safe(self) -> bool:
|
|
|
95 |
detected_secrets (dict[str, list[str]]): Dictionary of detected secrets.
|
96 |
"""
|
97 |
|
98 |
+
detected_secrets: dict[str, Any] | None = None
|
99 |
+
|
100 |
+
|
101 |
+
class SecretsInfo(BaseModel):
|
102 |
+
"""
|
103 |
+
Model representing information about a detected secret.
|
104 |
+
|
105 |
+
Attributes:
|
106 |
+
secret (str): The detected secret value.
|
107 |
+
line_number (int): The line number where the secret was found.
|
108 |
+
"""
|
109 |
+
|
110 |
+
secret: str
|
111 |
+
line_number: int
|
112 |
+
|
113 |
+
|
114 |
+
class ScanResult(BaseModel):
|
115 |
+
"""
|
116 |
+
Model representing the result of a secrets scan.
|
117 |
+
|
118 |
+
Attributes:
|
119 |
+
detected_secrets (dict[str, Any] | None): Dictionary of detected secrets, or None if no secrets were found.
|
120 |
+
modified_prompt (str): The modified prompt with secrets redacted.
|
121 |
+
has_secret (bool): Indicates if any secrets were detected.
|
122 |
+
risk_score (float): The risk score of the detection result.
|
123 |
+
"""
|
124 |
+
|
125 |
+
detected_secrets: dict[str, Any] | None = None
|
126 |
+
modified_prompt: str
|
127 |
+
has_secret: bool
|
128 |
+
risk_score: float
|
129 |
+
|
130 |
+
|
131 |
+
class DetectSecretsModel(weave.Model):
|
132 |
+
"""
|
133 |
+
Model for detecting secrets using the detect-secrets library.
|
134 |
+
"""
|
135 |
+
|
136 |
+
@staticmethod
|
137 |
+
def scan(text: str) -> dict[str, list[SecretsInfo]]:
|
138 |
+
"""
|
139 |
+
Scans the given text for secrets using the detect-secrets library.
|
140 |
+
|
141 |
+
Args:
|
142 |
+
text (str): The text to scan for secrets.
|
143 |
+
|
144 |
+
Returns:
|
145 |
+
dict[str, list[SecretsInfo]]: A dictionary where the keys are secret types and the values are lists of SecretsInfo objects.
|
146 |
+
"""
|
147 |
+
secrets = SecretsCollection()
|
148 |
+
temp_file = tempfile.NamedTemporaryFile(delete=False)
|
149 |
+
temp_file.write(text.encode("utf-8"))
|
150 |
+
temp_file.close()
|
151 |
+
|
152 |
+
with default_settings():
|
153 |
+
secrets.scan_file(str(temp_file.name))
|
154 |
+
|
155 |
+
unique_secrets = {}
|
156 |
+
for file in secrets.files:
|
157 |
+
for found_secret in secrets[file]:
|
158 |
+
if found_secret.secret_value is None:
|
159 |
+
continue
|
160 |
+
|
161 |
+
secret_type = found_secret.type
|
162 |
+
actual_secret = found_secret.secret_value
|
163 |
+
line_number = found_secret.line_number
|
164 |
+
|
165 |
+
if secret_type not in unique_secrets:
|
166 |
+
unique_secrets[secret_type] = []
|
167 |
+
|
168 |
+
unique_secrets[secret_type].append(
|
169 |
+
SecretsInfo(secret=actual_secret, line_number=line_number)
|
170 |
+
)
|
171 |
+
|
172 |
+
os.remove(temp_file.name)
|
173 |
+
return unique_secrets
|
174 |
+
|
175 |
+
@weave.op
|
176 |
+
def invoke(self, text: str) -> dict[str, list[SecretsInfo]]:
|
177 |
+
"""
|
178 |
+
Invokes the scan method to detect secrets in the given text.
|
179 |
+
|
180 |
+
Args:
|
181 |
+
text (str): The text to scan for secrets.
|
182 |
+
|
183 |
+
Returns:
|
184 |
+
dict[str, list[SecretsInfo]]: A dictionary where the keys are secret types and the values are lists of SecretsInfo objects.
|
185 |
+
"""
|
186 |
+
return self.scan(text)
|
187 |
+
|
188 |
+
|
189 |
+
class HyperScanModel(weave.Model):
|
190 |
+
"""
|
191 |
+
Model for detecting secrets using the Hyperscan library.
|
192 |
+
We use the Hyperscan library to scan for secrets using regex patterns.
|
193 |
+
The patterns are mined from https://github.com/mazen160/secrets-patterns-db
|
194 |
+
This model is used in conjunction with the DetectSecretsModel to improve the detection of secrets.
|
195 |
+
"""
|
196 |
+
|
197 |
+
_db: Any = PrivateAttr()
|
198 |
+
_pattern_map: dict[str, str] = PrivateAttr()
|
199 |
+
only_high_confidence: bool = False
|
200 |
+
ids: list[str] = []
|
201 |
+
|
202 |
+
def _load_patterns(self) -> dict[str, str]:
|
203 |
+
"""
|
204 |
+
Loads the patterns from a JSONL file.
|
205 |
+
|
206 |
+
Returns:
|
207 |
+
dict[str, str]: A dictionary where the keys are pattern names and the values are regex patterns.
|
208 |
+
"""
|
209 |
+
patterns = (
|
210 |
+
pathlib.Path(__file__).parent.resolve() / "secrets_patterns.jsonl"
|
211 |
+
).open()
|
212 |
+
patterns_list = [json.loads(line) for line in patterns]
|
213 |
+
if self.only_high_confidence:
|
214 |
+
patterns_list = [
|
215 |
+
pattern for pattern in patterns_list if pattern["confidence"] == "high"
|
216 |
+
]
|
217 |
+
return {pattern["name"]: pattern["regex"] for pattern in patterns_list}
|
218 |
+
|
219 |
+
def __init__(self, **kwargs: Any):
|
220 |
+
"""
|
221 |
+
Initializes the HyperScanModel instance.
|
222 |
+
"""
|
223 |
+
super().__init__(**kwargs)
|
224 |
+
|
225 |
+
def model_post_init(self, __context: Any) -> None:
|
226 |
+
"""
|
227 |
+
Post-initialization method to load patterns and compile the Hyperscan database.
|
228 |
+
"""
|
229 |
+
self._pattern_map = self._load_patterns()
|
230 |
+
self.ids = list(self._pattern_map.keys())
|
231 |
+
expressions = [pattern.encode() for pattern in self._pattern_map.values()]
|
232 |
+
self._db = hyperscan.Database()
|
233 |
+
self._db.compile(expressions=expressions, ids=list(range(len(expressions))))
|
234 |
+
|
235 |
+
def scan(self, text: str) -> dict[str, list[SecretsInfo]]:
|
236 |
+
"""
|
237 |
+
Scans the given text for secrets using the Hyperscan library.
|
238 |
+
|
239 |
+
Args:
|
240 |
+
text (str): The text to scan for secrets.
|
241 |
+
|
242 |
+
Returns:
|
243 |
+
dict[str, list[SecretsInfo]]: A dictionary where the keys are secret types and the values are lists of SecretsInfo objects.
|
244 |
+
"""
|
245 |
+
unique_secrets = {}
|
246 |
+
|
247 |
+
def on_match(idx, start, end, flags, context):
|
248 |
+
"""
|
249 |
+
Callback function for handling matches found by Hyperscan.
|
250 |
+
|
251 |
+
Args:
|
252 |
+
idx: The index of the matched pattern.
|
253 |
+
start: The start position of the match.
|
254 |
+
end: The end position of the match.
|
255 |
+
flags: The flags associated with the match.
|
256 |
+
context: The context provided to the scan method.
|
257 |
+
"""
|
258 |
+
secret = context["text"][start:end]
|
259 |
+
line_number = context["line_number"]
|
260 |
+
current_match = unique_secrets.setdefault(self.ids[idx], [])
|
261 |
+
|
262 |
+
if not current_match or len(secret) > len(current_match[0].secret):
|
263 |
+
unique_secrets[self.ids[idx]] = [
|
264 |
+
SecretsInfo(line_number=line_number, secret=secret)
|
265 |
+
]
|
266 |
+
|
267 |
+
for line_no, line in enumerate(text.splitlines(), start=1):
|
268 |
+
self._db.scan(
|
269 |
+
line.encode(),
|
270 |
+
match_event_handler=on_match,
|
271 |
+
context={"text": line, "line_number": line_no},
|
272 |
+
)
|
273 |
+
|
274 |
+
return unique_secrets
|
275 |
+
|
276 |
+
@weave.op
|
277 |
+
def invoke(self, text: str) -> dict[str, list[SecretsInfo]]:
|
278 |
+
"""
|
279 |
+
Invokes the scan method to detect secrets in the given text.
|
280 |
+
|
281 |
+
Args:
|
282 |
+
text (str): The text to scan for secrets.
|
283 |
+
|
284 |
+
Returns:
|
285 |
+
dict[str, list[SecretsInfo]]: A dictionary where the keys are secret types and the values are lists of SecretsInfo objects.
|
286 |
+
"""
|
287 |
+
return self.scan(text)
|
288 |
|
289 |
|
290 |
class SecretsDetectionGuardrail(Guardrail):
|
291 |
"""
|
292 |
+
Guardrail class for secrets detection using both detect-secrets and Hyperscan models.
|
|
|
|
|
293 |
|
294 |
Attributes:
|
295 |
+
redaction (REDACTION): The redaction mode to be applied.
|
296 |
+
_detect_secrets_model (Any): Instance of the DetectSecretsModel.
|
297 |
+
_hyperscan_model (Any): Instance of the HyperScanModel.
|
298 |
"""
|
299 |
|
|
|
|
|
300 |
redaction: REDACTION
|
301 |
+
_detect_secrets_model: Any = PrivateAttr()
|
302 |
+
_hyperscan_model: Any = PrivateAttr()
|
303 |
+
|
304 |
+
def model_post_init(self, __context: Any) -> None:
|
305 |
+
"""
|
306 |
+
Post-initialization method to initialize the detect-secrets and Hyperscan models.
|
307 |
+
"""
|
308 |
+
self._detect_secrets_model = DetectSecretsModel()
|
309 |
+
self._hyperscan_model = HyperScanModel()
|
310 |
|
311 |
def __init__(
|
312 |
self,
|
|
|
313 |
redaction: REDACTION = REDACTION.REDACT_ALL,
|
314 |
**kwargs,
|
315 |
):
|
316 |
"""
|
317 |
+
Initializes the SecretsDetectionGuardrail instance.
|
318 |
|
319 |
Args:
|
320 |
+
redaction (REDACTION): The redaction mode to be applied. Defaults to REDACTION.REDACT_ALL.
|
|
|
321 |
**kwargs: Additional keyword arguments.
|
322 |
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
323 |
super().__init__(
|
|
|
|
|
324 |
redaction=redaction,
|
325 |
)
|
326 |
|
327 |
+
def get_modified_value(
|
328 |
+
self, unique_secrets: dict[str, Any], lines: list[str]
|
329 |
+
) -> str:
|
330 |
+
"""
|
331 |
+
Redacts the detected secrets in the given lines of text.
|
332 |
+
|
333 |
+
Args:
|
334 |
+
unique_secrets (dict[str, Any]): Dictionary of detected secrets.
|
335 |
+
lines (list[str]): List of lines of text.
|
336 |
+
|
337 |
+
Returns:
|
338 |
+
str: The modified text with secrets redacted.
|
339 |
+
"""
|
340 |
+
for _, secrets_list in unique_secrets.items():
|
341 |
+
for secret_info in secrets_list:
|
342 |
+
secret = secret_info.secret
|
343 |
+
line_number = secret_info.line_number
|
344 |
+
lines[line_number - 1] = lines[line_number - 1].replace(
|
345 |
+
secret, redact_value(secret, self.redaction)
|
346 |
+
)
|
347 |
+
|
348 |
+
modified_value = "\n".join(lines)
|
349 |
+
return modified_value
|
350 |
+
|
351 |
+
def get_scan_result(
|
352 |
+
self, unique_secrets: dict[str, list[SecretsInfo]], lines: list[str]
|
353 |
+
) -> ScanResult | None:
|
354 |
+
"""
|
355 |
+
Generates a ScanResult based on the detected secrets.
|
356 |
+
|
357 |
+
Args:
|
358 |
+
unique_secrets (dict[str, list[SecretsInfo]]): Dictionary of detected secrets.
|
359 |
+
lines (list[str]): List of lines of text.
|
360 |
+
|
361 |
+
Returns:
|
362 |
+
ScanResult | None: The scan result if secrets are detected, otherwise None.
|
363 |
+
"""
|
364 |
+
if unique_secrets:
|
365 |
+
modified_value = self.get_modified_value(unique_secrets, lines)
|
366 |
+
detected_secrets = {
|
367 |
+
k: [i.secret for i in v] for k, v in unique_secrets.items()
|
368 |
+
}
|
369 |
+
|
370 |
+
return ScanResult(
|
371 |
+
**{
|
372 |
+
"detected_secrets": detected_secrets,
|
373 |
+
"modified_prompt": modified_value,
|
374 |
+
"has_secret": True,
|
375 |
+
"risk_score": 1.0,
|
376 |
+
}
|
377 |
+
)
|
378 |
+
return None
|
379 |
+
|
380 |
+
def scan(self, prompt: str) -> ScanResult:
|
381 |
+
"""
|
382 |
+
Scans the given prompt for secrets using both detect-secrets and Hyperscan models.
|
383 |
+
|
384 |
+
Args:
|
385 |
+
prompt (str): The text to scan for secrets.
|
386 |
+
|
387 |
+
Returns:
|
388 |
+
ScanResult: The scan result with detected secrets and redacted text.
|
389 |
+
"""
|
390 |
+
if prompt.strip() == "":
|
391 |
+
return ScanResult(
|
392 |
+
**{
|
393 |
+
"detected_secrets": None,
|
394 |
+
"modified_prompt": prompt,
|
395 |
+
"has_secret": False,
|
396 |
+
"risk_score": 0.0,
|
397 |
+
}
|
398 |
+
)
|
399 |
+
|
400 |
+
unique_secrets = self._detect_secrets_model.invoke(text=prompt)
|
401 |
+
results = self.get_scan_result(unique_secrets, prompt.splitlines())
|
402 |
+
if results:
|
403 |
+
return results
|
404 |
+
|
405 |
+
unique_secrets = self._hyperscan_model.invoke(text=prompt)
|
406 |
+
results = self.get_scan_result(unique_secrets, prompt.splitlines())
|
407 |
+
if results:
|
408 |
+
results.risk_score = 0.5
|
409 |
+
return results
|
410 |
+
|
411 |
+
return ScanResult(
|
412 |
+
**{
|
413 |
+
"detected_secrets": None,
|
414 |
+
"modified_prompt": prompt,
|
415 |
+
"has_secret": False,
|
416 |
+
"risk_score": 0.0,
|
417 |
+
}
|
418 |
+
)
|
419 |
+
|
420 |
+
@weave.op
|
421 |
def guard(
|
422 |
self,
|
423 |
prompt: str,
|
|
|
425 |
**kwargs,
|
426 |
) -> SecretsDetectionResponse | SecretsDetectionResponse:
|
427 |
"""
|
428 |
+
Guards the given prompt by scanning for secrets and optionally returning detected secrets.
|
429 |
|
430 |
Args:
|
431 |
+
prompt (str): The text to scan for secrets.
|
432 |
+
return_detected_secrets (bool): Whether to return detected secrets in the response. Defaults to True.
|
433 |
+
**kwargs: Additional keyword arguments.
|
434 |
|
435 |
Returns:
|
436 |
+
SecretsDetectionResponse | SecretsDetectionSimpleResponse: The response with scan results and redacted text.
|
437 |
"""
|
438 |
+
results = self.scan(prompt)
|
439 |
|
440 |
explanation_parts = []
|
441 |
+
if results.has_secret:
|
442 |
explanation_parts.append("Found the following secrets in the text:")
|
443 |
+
for secret_type, matches in results.detected_secrets.items():
|
444 |
explanation_parts.append(f"- {secret_type}: {len(matches)} instance(s)")
|
445 |
else:
|
446 |
explanation_parts.append("No secrets detected in the text.")
|
447 |
|
|
|
|
|
|
|
|
|
|
|
448 |
if return_detected_secrets:
|
449 |
return SecretsDetectionResponse(
|
450 |
+
contains_secrets=results.has_secret,
|
451 |
+
detected_secrets=results.detected_secrets,
|
452 |
explanation="\n".join(explanation_parts),
|
453 |
+
redacted_text=results.modified_prompt,
|
454 |
+
risk_score=results.risk_score,
|
455 |
)
|
456 |
else:
|
457 |
return SecretsDetectionSimpleResponse(
|
458 |
+
contains_secrets=not results.has_secret,
|
459 |
explanation="\n".join(explanation_parts),
|
460 |
+
redacted_text=results.modified_prompt,
|
461 |
+
risk_score=results.risk_score,
|
462 |
)
|
guardrails_genie/guardrails/secrets_detection/secrets_patterns.jsonl
CHANGED
The diff for this file is too large to render.
See raw diff
|
|
pyproject.toml
CHANGED
@@ -25,6 +25,10 @@ dependencies = [
|
|
25 |
"presidio-analyzer>=2.2.355",
|
26 |
"presidio-anonymizer>=2.2.355",
|
27 |
"instructor>=1.7.0",
|
|
|
|
|
|
|
|
|
28 |
]
|
29 |
|
30 |
[project.optional-dependencies]
|
|
|
25 |
"presidio-analyzer>=2.2.355",
|
26 |
"presidio-anonymizer>=2.2.355",
|
27 |
"instructor>=1.7.0",
|
28 |
+
"numpy<2.0.0",
|
29 |
+
"gibberish-detector>=0.1.1",
|
30 |
+
"detect-secrets>=1.5.0",
|
31 |
+
"hyperscan>=0.7.8"
|
32 |
]
|
33 |
|
34 |
[project.optional-dependencies]
|
tests/guardrails_genie/guardrails/test_secrets_detection.py
CHANGED
@@ -2,15 +2,14 @@ import hashlib
|
|
2 |
import re
|
3 |
|
4 |
import pytest
|
5 |
-
from hypothesis import
|
6 |
|
|
|
7 |
from guardrails_genie.guardrails.secrets_detection import (
|
8 |
-
DEFAULT_SECRETS_PATTERNS,
|
9 |
SecretsDetectionSimpleResponse,
|
10 |
SecretsDetectionResponse,
|
11 |
-
SecretsDetectionGuardrail,
|
12 |
REDACTION,
|
13 |
-
|
14 |
)
|
15 |
|
16 |
|
@@ -18,7 +17,7 @@ from guardrails_genie.guardrails.secrets_detection import (
|
|
18 |
def mock_secrets_guard(monkeypatch):
|
19 |
def _mock_guard(*args, **kwargs):
|
20 |
prompt = kwargs.get("prompt")
|
21 |
-
return_detected_types = kwargs.get("
|
22 |
|
23 |
if "safe text" in prompt:
|
24 |
if return_detected_types:
|
@@ -27,12 +26,14 @@ def mock_secrets_guard(monkeypatch):
|
|
27 |
explanation="No secrets detected in the text.",
|
28 |
detected_secrets={},
|
29 |
redacted_text=prompt,
|
|
|
30 |
)
|
31 |
else:
|
32 |
return SecretsDetectionSimpleResponse(
|
33 |
contains_secrets=False,
|
34 |
explanation="No secrets detected in the text.",
|
35 |
redacted_text=prompt,
|
|
|
36 |
)
|
37 |
else:
|
38 |
if return_detected_types:
|
@@ -41,12 +42,14 @@ def mock_secrets_guard(monkeypatch):
|
|
41 |
explanation="The output contains secrets.",
|
42 |
detected_secrets={"secrets": ["API_KEY"]},
|
43 |
redacted_text="My secret key is [REDACTED:]************[:REDACTED]",
|
|
|
44 |
)
|
45 |
else:
|
46 |
return SecretsDetectionSimpleResponse(
|
47 |
contains_secrets=True,
|
48 |
explanation="The output contains secrets.",
|
49 |
redacted_text="My secret key is [REDACTED:]************[:REDACTED]",
|
|
|
50 |
)
|
51 |
|
52 |
monkeypatch.setattr(
|
@@ -56,32 +59,22 @@ def mock_secrets_guard(monkeypatch):
|
|
56 |
|
57 |
|
58 |
def test_redact_partial():
|
59 |
-
text = "
|
60 |
-
|
61 |
-
redacted_text
|
62 |
-
assert redacted_text == "My secret key is [REDACTED:]AB..KL[:REDACTED]"
|
63 |
|
64 |
|
65 |
def test_redact_all():
|
66 |
-
text = "
|
67 |
-
|
68 |
-
redacted_text
|
69 |
-
assert redacted_text == "My secret key is [REDACTED:]************[:REDACTED]"
|
70 |
|
71 |
|
72 |
def test_redact_hash():
|
73 |
-
text = "
|
74 |
-
|
75 |
-
|
76 |
-
redacted_text
|
77 |
-
assert redacted_text == f"My secret key is [REDACTED:]{hashed_value}[:REDACTED]"
|
78 |
-
|
79 |
-
|
80 |
-
def test_redact_no_match():
|
81 |
-
text = "My secret key is ABCDEFGHIJKL"
|
82 |
-
matches = ["XYZ"]
|
83 |
-
redacted_text = redact(text, matches, REDACTION.REDACT_ALL)
|
84 |
-
assert redacted_text == text
|
85 |
|
86 |
|
87 |
def test_secrets_detection_guardrail_detect_types(mock_secrets_guard):
|
@@ -134,16 +127,15 @@ def test_secrets_detection_guardrail_no_secrets(mock_secrets_guard):
|
|
134 |
assert result.redacted_text == prompt
|
135 |
|
136 |
|
137 |
-
# Create a strategy to generate strings that match the patterns
|
138 |
def pattern_strategy(pattern):
|
139 |
return st.from_regex(re.compile(pattern), fullmatch=True)
|
140 |
|
141 |
|
142 |
-
@settings(deadline=1000)
|
143 |
-
@given(pattern_strategy(
|
144 |
def test_specific_pattern_guardrail(text):
|
145 |
guardrail = SecretsDetectionGuardrail(redaction=REDACTION.REDACT_ALL)
|
146 |
result = guardrail.guard(prompt=text, return_detected_secrets=True)
|
147 |
|
148 |
assert result.contains_secrets is True
|
149 |
-
assert "
|
|
|
2 |
import re
|
3 |
|
4 |
import pytest
|
5 |
+
from hypothesis import strategies as st, given, settings
|
6 |
|
7 |
+
from guardrails_genie.guardrails import SecretsDetectionGuardrail
|
8 |
from guardrails_genie.guardrails.secrets_detection import (
|
|
|
9 |
SecretsDetectionSimpleResponse,
|
10 |
SecretsDetectionResponse,
|
|
|
11 |
REDACTION,
|
12 |
+
redact_value,
|
13 |
)
|
14 |
|
15 |
|
|
|
17 |
def mock_secrets_guard(monkeypatch):
|
18 |
def _mock_guard(*args, **kwargs):
|
19 |
prompt = kwargs.get("prompt")
|
20 |
+
return_detected_types = kwargs.get("return_detected_secrets")
|
21 |
|
22 |
if "safe text" in prompt:
|
23 |
if return_detected_types:
|
|
|
26 |
explanation="No secrets detected in the text.",
|
27 |
detected_secrets={},
|
28 |
redacted_text=prompt,
|
29 |
+
risk_score=0.0,
|
30 |
)
|
31 |
else:
|
32 |
return SecretsDetectionSimpleResponse(
|
33 |
contains_secrets=False,
|
34 |
explanation="No secrets detected in the text.",
|
35 |
redacted_text=prompt,
|
36 |
+
risk_score=0.0,
|
37 |
)
|
38 |
else:
|
39 |
if return_detected_types:
|
|
|
42 |
explanation="The output contains secrets.",
|
43 |
detected_secrets={"secrets": ["API_KEY"]},
|
44 |
redacted_text="My secret key is [REDACTED:]************[:REDACTED]",
|
45 |
+
risk_score=1.0,
|
46 |
)
|
47 |
else:
|
48 |
return SecretsDetectionSimpleResponse(
|
49 |
contains_secrets=True,
|
50 |
explanation="The output contains secrets.",
|
51 |
redacted_text="My secret key is [REDACTED:]************[:REDACTED]",
|
52 |
+
risk_score=1.0,
|
53 |
)
|
54 |
|
55 |
monkeypatch.setattr(
|
|
|
59 |
|
60 |
|
61 |
def test_redact_partial():
|
62 |
+
text = "ABCDEFGHIJKL"
|
63 |
+
redacted_text = redact_value(text, REDACTION.REDACT_PARTIAL)
|
64 |
+
assert redacted_text == "[REDACTED:]AB..KL[:REDACTED]"
|
|
|
65 |
|
66 |
|
67 |
def test_redact_all():
|
68 |
+
text = "ABCDEFGHIJKL"
|
69 |
+
redacted_text = redact_value(text, REDACTION.REDACT_ALL)
|
70 |
+
assert redacted_text == "[REDACTED:]************[:REDACTED]"
|
|
|
71 |
|
72 |
|
73 |
def test_redact_hash():
|
74 |
+
text = "ABCDEFGHIJKL"
|
75 |
+
hashed_value = hashlib.md5(text.encode()).hexdigest()
|
76 |
+
redacted_text = redact_value(text, REDACTION.REDACT_HASH)
|
77 |
+
assert redacted_text == f"[REDACTED:]{hashed_value}[:REDACTED]"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
78 |
|
79 |
|
80 |
def test_secrets_detection_guardrail_detect_types(mock_secrets_guard):
|
|
|
127 |
assert result.redacted_text == prompt
|
128 |
|
129 |
|
|
|
130 |
def pattern_strategy(pattern):
|
131 |
return st.from_regex(re.compile(pattern), fullmatch=True)
|
132 |
|
133 |
|
134 |
+
@settings(deadline=1000)
|
135 |
+
@given(pattern_strategy(r"AKIA[0-9A-Z]{16}"))
|
136 |
def test_specific_pattern_guardrail(text):
|
137 |
guardrail = SecretsDetectionGuardrail(redaction=REDACTION.REDACT_ALL)
|
138 |
result = guardrail.guard(prompt=text, return_detected_secrets=True)
|
139 |
|
140 |
assert result.contains_secrets is True
|
141 |
+
assert "AWS Access Key" in result.detected_secrets
|