Create app.py
Browse files
app.py
ADDED
@@ -0,0 +1,380 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
|
2 |
+
import gradio as gr
|
3 |
+
import datetime
|
4 |
+
import pandas as pd
|
5 |
+
from groq import Groq
|
6 |
+
from sentence_transformers import SentenceTransformer
|
7 |
+
import chromadb
|
8 |
+
from chromadb.config import Settings
|
9 |
+
import hashlib
|
10 |
+
from typing import TypedDict, Optional, List
|
11 |
+
from langgraph.graph import StateGraph, END
|
12 |
+
import json
|
13 |
+
import tempfile
|
14 |
+
import subprocess
|
15 |
+
import os
|
16 |
+
|
17 |
+
from google.colab import userdata
|
18 |
+
api_key_coder =userdata.get('coder')
|
19 |
+
|
20 |
+
# ---------------------------
|
21 |
+
# 1. Define State
|
22 |
+
# ---------------------------
|
23 |
+
class CodeAssistantState(TypedDict):
|
24 |
+
user_input: str
|
25 |
+
similar_examples: Optional[List[str]]
|
26 |
+
generated_code: Optional[str]
|
27 |
+
error: Optional[str]
|
28 |
+
task_type: Optional[str] # "generate" or "explain"
|
29 |
+
evaluation_result: Optional[str]
|
30 |
+
|
31 |
+
# ---------------------------
|
32 |
+
# 2. Initialize Components
|
33 |
+
# ---------------------------
|
34 |
+
# Load data
|
35 |
+
df = pd.read_parquet("hf://datasets/openai/openai_humaneval/openai_humaneval/test-00000-of-00001.parquet")
|
36 |
+
extracted_data = df[['task_id', 'prompt', 'canonical_solution']]
|
37 |
+
|
38 |
+
# Initialize models and DB
|
39 |
+
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
|
40 |
+
groq_client = Groq(api_key=api_key_coder) # استبدل بمفتاح API الفعلي
|
41 |
+
|
42 |
+
client = chromadb.Client(Settings(
|
43 |
+
anonymized_telemetry=False,
|
44 |
+
persist_directory="rag_db"
|
45 |
+
))
|
46 |
+
collection = client.get_or_create_collection(
|
47 |
+
name="code_examples",
|
48 |
+
metadata={"hnsw:space": "cosine"}
|
49 |
+
)
|
50 |
+
|
51 |
+
# ---------------------------
|
52 |
+
# 3. Define Nodes
|
53 |
+
# ---------------------------
|
54 |
+
|
55 |
+
def initialize_db(state: CodeAssistantState):
|
56 |
+
try:
|
57 |
+
for _, row in extracted_data.iterrows():
|
58 |
+
embedding = embedding_model.encode([row['prompt'].strip()])[0]
|
59 |
+
doc_id = hashlib.md5(row['prompt'].encode()).hexdigest()
|
60 |
+
collection.add(
|
61 |
+
documents=[row['canonical_solution'].strip()],
|
62 |
+
metadatas=[{"prompt": row['prompt'], "type": "code_example"}],
|
63 |
+
ids=[doc_id],
|
64 |
+
embeddings=[embedding]
|
65 |
+
)
|
66 |
+
return state
|
67 |
+
except Exception as e:
|
68 |
+
state["error"] = f"DB initialization failed: {str(e)}"
|
69 |
+
return state
|
70 |
+
|
71 |
+
def retrieve_examples(state: CodeAssistantState):
|
72 |
+
try:
|
73 |
+
embedding = embedding_model.encode([state["user_input"]])[0]
|
74 |
+
results = collection.query(
|
75 |
+
query_embeddings=[embedding],
|
76 |
+
n_results=2
|
77 |
+
)
|
78 |
+
state["similar_examples"] = results['documents'][0] if results['documents'] else None
|
79 |
+
return state
|
80 |
+
except Exception as e:
|
81 |
+
state["error"] = f"Retrieval failed: {str(e)}"
|
82 |
+
return state
|
83 |
+
|
84 |
+
def classify_task_llm(state: CodeAssistantState) -> CodeAssistantState:
|
85 |
+
if not isinstance(state, dict):
|
86 |
+
raise ValueError("State must be a dictionary")
|
87 |
+
|
88 |
+
if "user_input" not in state or not state["user_input"].strip():
|
89 |
+
state["error"] = "No user input provided for classification"
|
90 |
+
state["task_type"] = "generate" # Default to code generation
|
91 |
+
return state
|
92 |
+
|
93 |
+
try:
|
94 |
+
prompt = f"""You are a helpful code assistant. Classify the user request as one of the following tasks:
|
95 |
+
- "generate": if the user wants to write or generate code
|
96 |
+
- "explain": if the user wants to understand what a code snippet does
|
97 |
+
- "test": if the user wants to test existing code
|
98 |
+
Return ONLY a JSON object in the format: {{"task": "...", "user_input": "..."}} — no explanation.
|
99 |
+
User request: {state["user_input"]}
|
100 |
+
"""
|
101 |
+
completion = groq_client.chat.completions.create(
|
102 |
+
model="llama3-70b-8192",
|
103 |
+
messages=[
|
104 |
+
{"role": "system", "content": "Classify code-related user input. Respond with ONLY JSON."},
|
105 |
+
{"role": "user", "content": prompt}
|
106 |
+
],
|
107 |
+
temperature=0.3,
|
108 |
+
max_tokens=200,
|
109 |
+
response_format={"type": "json_object"}
|
110 |
+
)
|
111 |
+
|
112 |
+
content = completion.choices[0].message.content.strip()
|
113 |
+
|
114 |
+
try:
|
115 |
+
result = json.loads(content)
|
116 |
+
if not isinstance(result, dict):
|
117 |
+
raise ValueError("Response is not a JSON object")
|
118 |
+
except (json.JSONDecodeError, ValueError) as e:
|
119 |
+
state["error"] = f"Invalid response format from LLM: {str(e)}. Content: {content}"
|
120 |
+
state["task_type"] = "generate" # Fallback to code generation
|
121 |
+
return state
|
122 |
+
|
123 |
+
task_type = result.get("task", "").lower()
|
124 |
+
if task_type not in ["generate", "explain", "test"]:
|
125 |
+
state["error"] = f"Invalid task type received: {task_type}"
|
126 |
+
task_type = "generate" # Default to generation
|
127 |
+
|
128 |
+
state["task_type"] = task_type
|
129 |
+
state["user_input"] = result.get("user_input", state["user_input"])
|
130 |
+
return state
|
131 |
+
|
132 |
+
except Exception as e:
|
133 |
+
state["error"] = f"LLM-based classification failed: {str(e)}"
|
134 |
+
state["task_type"] = "generate" # Fallback to code generation
|
135 |
+
return state
|
136 |
+
|
137 |
+
def test_code(state: CodeAssistantState) -> CodeAssistantState:
|
138 |
+
if not isinstance(state, dict):
|
139 |
+
raise ValueError("State must be a dictionary")
|
140 |
+
|
141 |
+
if "user_input" not in state or not state["user_input"].strip():
|
142 |
+
state["error"] = "Please provide the code you want to test"
|
143 |
+
return state
|
144 |
+
|
145 |
+
try:
|
146 |
+
messages = [
|
147 |
+
{"role": "system", "content": """You are a Python testing expert. Generate unit tests for the provided code.
|
148 |
+
Return the test code in the following format:
|
149 |
+
```python
|
150 |
+
# Test code here
|
151 |
+
```"""},
|
152 |
+
{"role": "user", "content": f"Generate comprehensive unit tests for this Python code:\n\n{state['user_input']}"}
|
153 |
+
]
|
154 |
+
|
155 |
+
completion = groq_client.chat.completions.create(
|
156 |
+
model="llama-3.3-70b-versatile",
|
157 |
+
messages=messages,
|
158 |
+
temperature=0.5,
|
159 |
+
max_tokens=2048,
|
160 |
+
)
|
161 |
+
|
162 |
+
test_code = completion.choices[0].message.content
|
163 |
+
if test_code.startswith('```python'):
|
164 |
+
test_code = test_code[9:-3] if test_code.endswith('```') else test_code[9:]
|
165 |
+
elif test_code.startswith('```'):
|
166 |
+
test_code = test_code[3:-3] if test_code.endswith('```') else test_code[3:]
|
167 |
+
|
168 |
+
state["generated_tests"] = test_code.strip()
|
169 |
+
state["metadata"] = {
|
170 |
+
"model": "llama-3.3-70b-versatile",
|
171 |
+
"timestamp": datetime.datetime.now().isoformat()
|
172 |
+
}
|
173 |
+
|
174 |
+
# Execute the tests and capture results
|
175 |
+
try:
|
176 |
+
# Create a temporary file to store the original code
|
177 |
+
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as code_file:
|
178 |
+
code_file.write(state['user_input'])
|
179 |
+
code_file_path = code_file.name
|
180 |
+
|
181 |
+
# Create a temporary file to store the test code
|
182 |
+
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as test_file:
|
183 |
+
test_file.write(test_code)
|
184 |
+
test_file_path = test_file.name
|
185 |
+
|
186 |
+
# Run the tests and capture output
|
187 |
+
result = subprocess.run(
|
188 |
+
['python', test_file_path],
|
189 |
+
capture_output=True,
|
190 |
+
text=True,
|
191 |
+
timeout=10
|
192 |
+
)
|
193 |
+
|
194 |
+
state["test_results"] = {
|
195 |
+
"returncode": result.returncode,
|
196 |
+
"stdout": result.stdout,
|
197 |
+
"stderr": result.stderr
|
198 |
+
}
|
199 |
+
|
200 |
+
# Clean up temporary files
|
201 |
+
os.unlink(code_file_path)
|
202 |
+
os.unlink(test_file_path)
|
203 |
+
|
204 |
+
except Exception as e:
|
205 |
+
state["test_error"] = f"Error executing tests: {str(e)}"
|
206 |
+
|
207 |
+
print(f"\nGenerated Tests:\n{test_code.strip()}\n")
|
208 |
+
if "test_results" in state:
|
209 |
+
print(f"Test Execution Results:\n{state['test_results']['stdout']}")
|
210 |
+
if state["test_results"]["stderr"]:
|
211 |
+
print(f"Errors:\n{state['test_results']['stderr']}")
|
212 |
+
|
213 |
+
return state
|
214 |
+
|
215 |
+
except Exception as e:
|
216 |
+
state["error"] = f"Error generating tests: {str(e)}"
|
217 |
+
return state
|
218 |
+
|
219 |
+
def generate_code(state: CodeAssistantState) -> CodeAssistantState:
|
220 |
+
if not isinstance(state, dict):
|
221 |
+
raise ValueError("State must be a dictionary")
|
222 |
+
|
223 |
+
if "user_input" not in state or not state["user_input"].strip():
|
224 |
+
state["error"] = "Please enter your code request"
|
225 |
+
return state
|
226 |
+
|
227 |
+
try:
|
228 |
+
messages = [
|
229 |
+
{"role": "system", "content": "You are a Python coding assistant. Return only clean, production-ready code."},
|
230 |
+
{"role": "user", "content": state["user_input"].strip()}
|
231 |
+
]
|
232 |
+
|
233 |
+
completion = groq_client.chat.completions.create(
|
234 |
+
model="llama-3.3-70b-versatile",
|
235 |
+
messages=messages,
|
236 |
+
temperature=0.7,
|
237 |
+
max_tokens=2048,
|
238 |
+
)
|
239 |
+
|
240 |
+
code = completion.choices[0].message.content
|
241 |
+
if code.startswith('```python'):
|
242 |
+
code = code[9:-3] if code.endswith('```') else code[9:]
|
243 |
+
elif code.startswith('```'):
|
244 |
+
code = code[3:-3] if code.endswith('```') else code[3:]
|
245 |
+
|
246 |
+
state["generated_code"] = code.strip()
|
247 |
+
state["metadata"] = {
|
248 |
+
"model": "llama-3.3-70b-versatile",
|
249 |
+
"timestamp": datetime.datetime.now().isoformat()
|
250 |
+
}
|
251 |
+
|
252 |
+
# سطر طباعة النتيجة المضافة
|
253 |
+
print(f"\nGenerated Code:\n{code.strip()}\n")
|
254 |
+
|
255 |
+
return state
|
256 |
+
|
257 |
+
except Exception as e:
|
258 |
+
state["error"] = f"Error generating code: {str(e)}"
|
259 |
+
return state
|
260 |
+
|
261 |
+
def explain_code(state: CodeAssistantState) -> CodeAssistantState:
|
262 |
+
try:
|
263 |
+
messages = [
|
264 |
+
{"role": "system", "content": "You are a Python expert. Explain what the following code does in plain language."},
|
265 |
+
{"role": "user", "content": state["user_input"].strip()}
|
266 |
+
]
|
267 |
+
|
268 |
+
completion = groq_client.chat.completions.create(
|
269 |
+
model="llama-3.3-70b-versatile",
|
270 |
+
messages=messages,
|
271 |
+
temperature=0.5,
|
272 |
+
max_tokens=1024
|
273 |
+
)
|
274 |
+
|
275 |
+
explanation = completion.choices[0].message.content.strip()
|
276 |
+
state["generated_code"] = explanation
|
277 |
+
state["metadata"] = {
|
278 |
+
"model": "llama-3.3-70b-versatile",
|
279 |
+
"timestamp": datetime.datetime.now().isoformat()
|
280 |
+
}
|
281 |
+
|
282 |
+
# سطر طباعة النتيجة المضافة
|
283 |
+
print(f"Explanation:\n{explanation}")
|
284 |
+
|
285 |
+
return state
|
286 |
+
|
287 |
+
except Exception as e:
|
288 |
+
state["error"] = f"Error explaining code: {str(e)}"
|
289 |
+
return state
|
290 |
+
|
291 |
+
# ---------------------------
|
292 |
+
# 4. Build StateGraph Workflow (محدث)
|
293 |
+
# ---------------------------
|
294 |
+
workflow = StateGraph(CodeAssistantState)
|
295 |
+
|
296 |
+
# إضافة جميع العقد بما فيها العقدة الجديدة
|
297 |
+
workflow.add_node("initialize_db", initialize_db)
|
298 |
+
workflow.add_node("retrieve_examples", retrieve_examples)
|
299 |
+
workflow.add_node("classify_task", classify_task_llm)
|
300 |
+
workflow.add_node("generate_code", generate_code)
|
301 |
+
workflow.add_node("explain_code", explain_code)
|
302 |
+
workflow.add_node("test_code", test_code) # العقدة الجديدة
|
303 |
+
|
304 |
+
# تحديد نقطة البداية والروابط الأساسية
|
305 |
+
workflow.set_entry_point("initialize_db")
|
306 |
+
workflow.add_edge("initialize_db", "retrieve_examples")
|
307 |
+
workflow.add_edge("retrieve_examples", "classify_task")
|
308 |
+
|
309 |
+
# تحديث الروابط الشرطية لتشمل خيار الاختبار
|
310 |
+
workflow.add_conditional_edges(
|
311 |
+
"classify_task",
|
312 |
+
lambda state: state["task_type"],
|
313 |
+
{
|
314 |
+
"generate": "generate_code",
|
315 |
+
"explain": "explain_code",
|
316 |
+
"test": "test_code" # الرابط الجديد
|
317 |
+
}
|
318 |
+
)
|
319 |
+
|
320 |
+
# إضافة روابط النهاية لجميع العقد
|
321 |
+
workflow.add_edge("generate_code", END)
|
322 |
+
workflow.add_edge("explain_code", END)
|
323 |
+
workflow.add_edge("test_code", END) # الرابط الجديد
|
324 |
+
|
325 |
+
# تجميع التدفق النهائي
|
326 |
+
app_workflow = workflow.compile()
|
327 |
+
|
328 |
+
# ---------------------------
|
329 |
+
# 5. Create Gradio Interface
|
330 |
+
# ---------------------------
|
331 |
+
def process_input(user_input: str):
|
332 |
+
"""Function that will be called by Gradio to process user input"""
|
333 |
+
initial_state = {
|
334 |
+
"user_input": user_input,
|
335 |
+
"similar_examples": None,
|
336 |
+
"generated_code": None,
|
337 |
+
"error": None,
|
338 |
+
"task_type": None
|
339 |
+
}
|
340 |
+
|
341 |
+
result = app_workflow.invoke(initial_state)
|
342 |
+
|
343 |
+
if result.get("error"):
|
344 |
+
return f"Error: {result['error']}"
|
345 |
+
|
346 |
+
if result["task_type"] == "generate":
|
347 |
+
return f"Generated Code:\n\n{result['generated_code']}"
|
348 |
+
else:
|
349 |
+
return f"Code Explanation:\n\n{result['generated_code']}"
|
350 |
+
|
351 |
+
# تعريف واجهة Gradio
|
352 |
+
# Define Gradio interface
|
353 |
+
with gr.Blocks(title="Smart Code Assistant") as demo:
|
354 |
+
gr.Markdown("""
|
355 |
+
# Smart Code Assistant
|
356 |
+
Enter your request either to generate new code or to explain existing code
|
357 |
+
""")
|
358 |
+
|
359 |
+
with gr.Row():
|
360 |
+
input_text = gr.Textbox(label="Enter your request", placeholder="Example: Write a function to add two numbers... or Explain this code...")
|
361 |
+
output_text = gr.Textbox(label="Result", interactive=False)
|
362 |
+
|
363 |
+
submit_btn = gr.Button("Execute")
|
364 |
+
submit_btn.click(fn=process_input, inputs=input_text, outputs=output_text)
|
365 |
+
|
366 |
+
# Quick examples
|
367 |
+
gr.Examples(
|
368 |
+
examples=[
|
369 |
+
["Write a Python function to add two numbers"],
|
370 |
+
["Explain this code: for i in range(5): print(i)"],
|
371 |
+
["Create a function to convert temperature from Fahrenheit to Celsius"],
|
372 |
+
["test for i in range(3): print('Hello from test', i)"]
|
373 |
+
],
|
374 |
+
|
375 |
+
inputs=input_text
|
376 |
+
)
|
377 |
+
|
378 |
+
# تشغيل الواجهة
|
379 |
+
if __name__ == "__main__":
|
380 |
+
demo.launch()
|