uvpatel7271 commited on
Commit
9159c06
·
1 Parent(s): 3ba9e4a

added code modularity

Browse files
analyzers/__init__.py ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Domain-specific analyzers for multi-domain code understanding."""
2
+
3
+ from .dsa_analyzer import analyze_dsa_code
4
+ from .ds_analyzer import analyze_data_science_code
5
+ from .ml_analyzer import analyze_ml_code
6
+ from .web_analyzer import analyze_web_code
7
+
8
+ __all__ = [
9
+ "analyze_dsa_code",
10
+ "analyze_data_science_code",
11
+ "analyze_ml_code",
12
+ "analyze_web_code",
13
+ ]
analyzers/ds_analyzer.py ADDED
@@ -0,0 +1,56 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Analyzer for data-science oriented Python code."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from typing import Any, Dict
6
+
7
+ from schemas.response import AnalysisIssue, DomainAnalysis
8
+
9
+
10
+ def analyze_data_science_code(code: str, parsed: Dict[str, Any], complexity: Dict[str, Any]) -> DomainAnalysis:
11
+ """Inspect pandas and numpy code for vectorization and leakage concerns."""
12
+
13
+ issues = []
14
+ suggestions = []
15
+ score = 0.72
16
+
17
+ if "iterrows(" in code or "itertuples(" in code:
18
+ issues.append(
19
+ AnalysisIssue(
20
+ title="Row-wise dataframe iteration detected",
21
+ severity="medium",
22
+ description="Looping through dataframe rows is usually slower and less scalable than vectorized operations.",
23
+ )
24
+ )
25
+ suggestions.append("Use vectorized pandas or numpy expressions instead of row-wise iteration.")
26
+ score -= 0.18
27
+
28
+ if "inplace=True" in code:
29
+ suggestions.append("Avoid inplace mutation to keep data pipelines easier to reason about and test.")
30
+ score -= 0.05
31
+
32
+ if "fit_transform(" in code and "train_test_split" not in code:
33
+ issues.append(
34
+ AnalysisIssue(
35
+ title="Potential data leakage risk",
36
+ severity="high",
37
+ description="Feature transforms appear before an explicit train/test split.",
38
+ )
39
+ )
40
+ suggestions.append("Split train and validation data before fitting stateful preprocessing steps.")
41
+ score -= 0.2
42
+
43
+ if not suggestions:
44
+ suggestions.append("Add schema assumptions and null-handling checks for production data quality.")
45
+
46
+ return DomainAnalysis(
47
+ domain="data_science",
48
+ domain_score=max(0.05, round(score, 4)),
49
+ issues=issues,
50
+ suggestions=suggestions,
51
+ highlights={
52
+ "vectorization_risk": float("iterrows(" in code or "itertuples(" in code),
53
+ "time_complexity": complexity["time_complexity"],
54
+ "uses_pandas": float(parsed.get("uses_pandas", False)),
55
+ },
56
+ )
analyzers/dsa_analyzer.py ADDED
@@ -0,0 +1,48 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Analyzer for DSA and competitive-programming style Python code."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from typing import Any, Dict
6
+
7
+ from schemas.response import AnalysisIssue, DomainAnalysis
8
+
9
+
10
+ def analyze_dsa_code(code: str, parsed: Dict[str, Any], complexity: Dict[str, Any]) -> DomainAnalysis:
11
+ """Inspect algorithmic code for brute-force patterns and efficiency risks."""
12
+
13
+ issues = []
14
+ suggestions = []
15
+ score = 0.7
16
+
17
+ if parsed.get("max_loop_depth", 0) >= 2:
18
+ issues.append(
19
+ AnalysisIssue(
20
+ title="Nested loops suggest brute-force behavior",
21
+ severity="medium",
22
+ description="The implementation scans the input multiple times, which is often avoidable in DSA problems.",
23
+ )
24
+ )
25
+ suggestions.append("Consider replacing nested scans with a hashmap, prefix table, or sorted search strategy.")
26
+ score -= 0.15
27
+
28
+ if parsed.get("uses_recursion"):
29
+ suggestions.append("Verify recursion depth and add memoization or iterative conversion if the input size can grow.")
30
+ score -= 0.05
31
+
32
+ if "sorted(" in code or ".sort(" in code:
33
+ suggestions.append("Sorting is acceptable here, but validate whether a direct O(n) pass can remove the sort.")
34
+
35
+ if not suggestions:
36
+ suggestions.append("Document the intended time complexity and add edge-case checks for empty input and duplicates.")
37
+
38
+ return DomainAnalysis(
39
+ domain="dsa",
40
+ domain_score=max(0.05, round(score, 4)),
41
+ issues=issues,
42
+ suggestions=suggestions,
43
+ highlights={
44
+ "time_complexity": complexity["time_complexity"],
45
+ "space_complexity": complexity["space_complexity"],
46
+ "max_loop_depth": float(parsed.get("max_loop_depth", 0)),
47
+ },
48
+ )
analyzers/ml_analyzer.py ADDED
@@ -0,0 +1,61 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Analyzer for machine-learning and deep-learning code."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from typing import Any, Dict
6
+
7
+ from schemas.response import AnalysisIssue, DomainAnalysis
8
+
9
+
10
+ def analyze_ml_code(code: str, parsed: Dict[str, Any], complexity: Dict[str, Any]) -> DomainAnalysis:
11
+ """Inspect training and inference logic for common ML / DL mistakes."""
12
+
13
+ issues = []
14
+ suggestions = []
15
+ score = 0.74
16
+
17
+ if "torch" in code and "model.eval()" not in code and "predict" in code.lower():
18
+ issues.append(
19
+ AnalysisIssue(
20
+ title="Inference path may be missing eval mode",
21
+ severity="high",
22
+ description="Inference code should place the model in eval mode before prediction.",
23
+ )
24
+ )
25
+ suggestions.append("Call model.eval() before inference to disable training-time behavior such as dropout.")
26
+ score -= 0.18
27
+
28
+ if "torch" in code and "no_grad" not in code and "predict" in code.lower():
29
+ suggestions.append("Wrap inference in torch.no_grad() to reduce memory usage and avoid unnecessary gradient tracking.")
30
+ score -= 0.12
31
+
32
+ if parsed.get("calls_backward") and not parsed.get("calls_optimizer_step"):
33
+ issues.append(
34
+ AnalysisIssue(
35
+ title="Backward pass without optimizer step",
36
+ severity="medium",
37
+ description="Gradients are computed, but the optimizer step is not obvious in the snippet.",
38
+ )
39
+ )
40
+ suggestions.append("Ensure optimizer.step() and optimizer.zero_grad() are placed correctly in the training loop.")
41
+ score -= 0.12
42
+
43
+ if "CrossEntropyLoss" in code and "softmax(" in code:
44
+ suggestions.append("CrossEntropyLoss expects raw logits; remove the explicit softmax before the loss when possible.")
45
+ score -= 0.05
46
+
47
+ if not suggestions:
48
+ suggestions.append("Add explicit train/eval mode transitions and log validation metrics during training.")
49
+
50
+ return DomainAnalysis(
51
+ domain="ml_dl",
52
+ domain_score=max(0.05, round(score, 4)),
53
+ issues=issues,
54
+ suggestions=suggestions,
55
+ highlights={
56
+ "uses_torch": float(parsed.get("uses_torch", False)),
57
+ "has_eval_mode": float("model.eval()" in code),
58
+ "has_no_grad": float("no_grad" in code),
59
+ "time_complexity": complexity["time_complexity"],
60
+ },
61
+ )
analyzers/web_analyzer.py ADDED
@@ -0,0 +1,50 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Analyzer for FastAPI and backend web-service code."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from typing import Any, Dict
6
+
7
+ from schemas.response import AnalysisIssue, DomainAnalysis
8
+
9
+
10
+ def analyze_web_code(code: str, parsed: Dict[str, Any], complexity: Dict[str, Any]) -> DomainAnalysis:
11
+ """Inspect API code for validation, routing, and backend safety concerns."""
12
+
13
+ issues = []
14
+ suggestions = []
15
+ score = 0.76
16
+
17
+ route_decorators = set(parsed.get("route_decorators", []))
18
+ if route_decorators and not parsed.get("uses_pydantic"):
19
+ issues.append(
20
+ AnalysisIssue(
21
+ title="Request validation model is missing",
22
+ severity="high",
23
+ description="Route handlers appear present, but no obvious Pydantic validation layer was detected.",
24
+ )
25
+ )
26
+ suggestions.append("Add Pydantic request and response models for strict validation and type-safe contracts.")
27
+ score -= 0.2
28
+
29
+ if {"get", "post", "put", "delete"} & route_decorators and "async def" not in code:
30
+ suggestions.append("Prefer async FastAPI endpoints when the route performs I/O or awaits downstream services.")
31
+ score -= 0.08
32
+
33
+ if "request.json()" in code or "request.body()" in code:
34
+ suggestions.append("Validate raw request payloads before use; avoid trusting unchecked JSON input.")
35
+ score -= 0.08
36
+
37
+ if not suggestions:
38
+ suggestions.append("Add domain-specific response models and centralize dependency injection for cleaner API structure.")
39
+
40
+ return DomainAnalysis(
41
+ domain="web",
42
+ domain_score=max(0.05, round(score, 4)),
43
+ issues=issues,
44
+ suggestions=suggestions,
45
+ highlights={
46
+ "route_count": float(len(route_decorators)),
47
+ "uses_validation": float(parsed.get("uses_pydantic", False)),
48
+ "time_complexity": complexity["time_complexity"],
49
+ },
50
+ )
models/__init__.py ADDED
@@ -0,0 +1,5 @@
 
 
 
 
 
 
1
+ """PyTorch-backed model wrappers for the analyzer platform."""
2
+
3
+ from .pytorch_model import PyTorchCodeAnalyzerModel
4
+
5
+ __all__ = ["PyTorchCodeAnalyzerModel"]
models/pytorch_model.py ADDED
@@ -0,0 +1,149 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """PyTorch + transformers model wrapper for multi-domain code scoring."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import hashlib
6
+ from typing import Dict, List, Sequence
7
+
8
+ import torch
9
+ import torch.nn.functional as F
10
+
11
+ try:
12
+ from transformers import AutoModel, AutoTokenizer
13
+ except Exception:
14
+ AutoModel = None # type: ignore[assignment]
15
+ AutoTokenizer = None # type: ignore[assignment]
16
+
17
+
18
+ DOMAIN_PROTOTYPES: Dict[str, List[str]] = {
19
+ "dsa": [
20
+ "Binary search, hashmap optimization, recursion, dynamic programming, arrays, trees, graphs, stack, queue, complexity.",
21
+ "Competitive programming algorithm with loops, memoization, prefix sums, and asymptotic analysis.",
22
+ ],
23
+ "data_science": [
24
+ "Pandas dataframe transformation, numpy vectorization, feature leakage, train test split, iterrows misuse.",
25
+ "Data cleaning pipeline using pandas, numpy, aggregation, joins, and vectorized operations.",
26
+ ],
27
+ "ml_dl": [
28
+ "PyTorch model, training loop, optimizer, backward pass, eval mode, no_grad, loss function, dataloader.",
29
+ "Machine learning inference and training code with torch, sklearn, tensors, gradients, and model checkpoints.",
30
+ ],
31
+ "web": [
32
+ "FastAPI endpoint, request validation, Pydantic models, async routes, API security, backend service design.",
33
+ "REST API backend with routers, dependency injection, input validation, serialization, and error handling.",
34
+ ],
35
+ "general": [
36
+ "General Python utility code with readable structure, typing, tests, and maintainable abstractions.",
37
+ ],
38
+ }
39
+
40
+ QUALITY_ANCHORS: Dict[str, List[str]] = {
41
+ "high": [
42
+ "Readable typed Python code with validation, efficient algorithms, vectorized operations, safe inference, and clean API boundaries.",
43
+ "Production-ready code with small functions, docstrings, low complexity, and clear error handling.",
44
+ ],
45
+ "low": [
46
+ "Brute-force nested loops, missing validation, unsafe input handling, missing eval mode, missing no_grad, and code smells.",
47
+ "Hard to maintain code with high complexity, repeated scans, mutable side effects, and unclear structure.",
48
+ ],
49
+ }
50
+
51
+
52
+ class _HashEmbeddingBackend:
53
+ """Torch-native fallback when pretrained weights cannot be loaded."""
54
+
55
+ def __init__(self, dimensions: int = 128) -> None:
56
+ self.dimensions = dimensions
57
+ self.model_id = "hashed-token-fallback"
58
+ self.backend_name = "hashed-token-fallback"
59
+ self.notes = ["Using hashed embeddings because pretrained transformer weights are unavailable."]
60
+
61
+ def embed_texts(self, texts: Sequence[str]) -> torch.Tensor:
62
+ matrix = torch.zeros((len(texts), self.dimensions), dtype=torch.float32)
63
+ for row_index, text in enumerate(texts):
64
+ tokens = text.lower().split()[:512]
65
+ if not tokens:
66
+ matrix[row_index, 0] = 1.0
67
+ continue
68
+ for token in tokens:
69
+ digest = hashlib.md5(token.encode("utf-8")).hexdigest()
70
+ bucket = int(digest[:8], 16) % self.dimensions
71
+ sign = -1.0 if int(digest[8:10], 16) % 2 else 1.0
72
+ matrix[row_index, bucket] += sign
73
+ return F.normalize(matrix + 1e-6, dim=1)
74
+
75
+
76
+ class PyTorchCodeAnalyzerModel:
77
+ """Score code using pretrained transformer embeddings plus prototype similarity."""
78
+
79
+ def __init__(self, model_id: str = "huggingface/CodeBERTa-small-v1") -> None:
80
+ self.model_id = model_id
81
+ self.backend_name = model_id
82
+ self.notes: List[str] = []
83
+ self._tokenizer = None
84
+ self._model = None
85
+ self._fallback = _HashEmbeddingBackend()
86
+ self._prototype_cache: Dict[str, torch.Tensor] = {}
87
+
88
+ def _ensure_loaded(self) -> None:
89
+ if self._model is not None or self.notes:
90
+ return
91
+ if AutoTokenizer is None or AutoModel is None:
92
+ self.backend_name = self._fallback.backend_name
93
+ self.notes = list(self._fallback.notes)
94
+ return
95
+ try:
96
+ self._tokenizer = AutoTokenizer.from_pretrained(self.model_id)
97
+ self._model = AutoModel.from_pretrained(self.model_id)
98
+ self._model.eval()
99
+ self.notes.append(f"Loaded pretrained encoder `{self.model_id}`.")
100
+ except Exception as exc:
101
+ self.backend_name = self._fallback.backend_name
102
+ self.notes = list(self._fallback.notes) + [f"Pretrained load failed: {type(exc).__name__}: {exc}"]
103
+
104
+ def _embed_texts(self, texts: Sequence[str]) -> torch.Tensor:
105
+ self._ensure_loaded()
106
+ if self._model is None or self._tokenizer is None:
107
+ return self._fallback.embed_texts(texts)
108
+ encoded = self._tokenizer(list(texts), padding=True, truncation=True, max_length=256, return_tensors="pt")
109
+ with torch.no_grad():
110
+ outputs = self._model(**encoded)
111
+ hidden = outputs.last_hidden_state
112
+ mask = encoded["attention_mask"].unsqueeze(-1)
113
+ pooled = (hidden * mask).sum(dim=1) / mask.sum(dim=1).clamp(min=1)
114
+ return F.normalize(pooled, dim=1)
115
+
116
+ def _prototype_matrix(self, bucket: str, texts: Sequence[str]) -> torch.Tensor:
117
+ if bucket not in self._prototype_cache:
118
+ self._prototype_cache[bucket] = self._embed_texts(texts)
119
+ return self._prototype_cache[bucket]
120
+
121
+ def predict(self, code: str, context_window: str, static_summary: Dict[str, object]) -> Dict[str, object]:
122
+ """Predict domain probabilities and a model quality score."""
123
+
124
+ document = (
125
+ f"Code:\n{code.strip()[:4000]}\n\n"
126
+ f"Context:\n{context_window.strip()[:1000]}\n\n"
127
+ f"Static hints:\n{static_summary}\n"
128
+ )
129
+ candidate = self._embed_texts([document])
130
+
131
+ domain_scores: Dict[str, float] = {}
132
+ for domain, texts in DOMAIN_PROTOTYPES.items():
133
+ matrix = self._prototype_matrix(f"domain:{domain}", texts)
134
+ similarity = torch.matmul(candidate, matrix.T).max().item()
135
+ domain_scores[domain] = round((similarity + 1.0) / 2.0, 4)
136
+
137
+ high_matrix = self._prototype_matrix("quality:high", QUALITY_ANCHORS["high"])
138
+ low_matrix = self._prototype_matrix("quality:low", QUALITY_ANCHORS["low"])
139
+ high_similarity = torch.matmul(candidate, high_matrix.T).max().item()
140
+ low_similarity = torch.matmul(candidate, low_matrix.T).max().item()
141
+ ml_quality_score = torch.sigmoid(torch.tensor((high_similarity - low_similarity) * 4.0)).item()
142
+
143
+ return {
144
+ "domain_scores": domain_scores,
145
+ "ml_quality_score": round(float(ml_quality_score), 4),
146
+ "backend_name": self.backend_name,
147
+ "model_id": self.model_id,
148
+ "notes": list(self.notes),
149
+ }
schemas/__init__.py ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Public schemas for the multi-domain analysis platform."""
2
+
3
+ from .request import AnalyzeCodeRequest
4
+ from .response import AnalyzeCodeResponse, AnalysisIssue, DomainAnalysis, ScoreBreakdown, StaticAnalysisSummary
5
+
6
+ __all__ = [
7
+ "AnalyzeCodeRequest",
8
+ "AnalyzeCodeResponse",
9
+ "AnalysisIssue",
10
+ "DomainAnalysis",
11
+ "ScoreBreakdown",
12
+ "StaticAnalysisSummary",
13
+ ]
schemas/request.py ADDED
@@ -0,0 +1,19 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Request schemas for code analysis endpoints and UI."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from typing import Literal
6
+
7
+ from pydantic import BaseModel, Field
8
+
9
+
10
+ DomainHint = Literal["auto", "dsa", "data_science", "ml_dl", "web"]
11
+
12
+
13
+ class AnalyzeCodeRequest(BaseModel):
14
+ """Validated input payload for multi-domain code analysis."""
15
+
16
+ code: str = Field(..., min_length=1, description="Source code to analyze.")
17
+ context_window: str = Field(default="", max_length=2000, description="Optional repository or task context.")
18
+ traceback_text: str = Field(default="", max_length=2000, description="Optional runtime or test failure output.")
19
+ domain_hint: DomainHint = Field(default="auto", description="Optional domain override when auto detection is not desired.")
schemas/response.py ADDED
@@ -0,0 +1,70 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Response schemas for the multi-domain analysis platform."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from typing import Dict, List, Literal
6
+
7
+ from pydantic import BaseModel, Field
8
+
9
+
10
+ DomainType = Literal["dsa", "data_science", "ml_dl", "web", "general"]
11
+ Severity = Literal["low", "medium", "high"]
12
+
13
+
14
+ class AnalysisIssue(BaseModel):
15
+ """One detected issue or risk in the code snippet."""
16
+
17
+ title: str
18
+ severity: Severity
19
+ description: str
20
+ line_hint: int | None = None
21
+
22
+
23
+ class StaticAnalysisSummary(BaseModel):
24
+ """Language-agnostic static-analysis signals."""
25
+
26
+ syntax_valid: bool
27
+ syntax_error: str = ""
28
+ cyclomatic_complexity: int = Field(..., ge=1)
29
+ line_count: int = Field(..., ge=0)
30
+ max_loop_depth: int = Field(..., ge=0)
31
+ time_complexity: str = "Unknown"
32
+ space_complexity: str = "Unknown"
33
+ detected_imports: List[str] = Field(default_factory=list)
34
+ code_smells: List[str] = Field(default_factory=list)
35
+
36
+
37
+ class DomainAnalysis(BaseModel):
38
+ """Domain-specific analysis payload returned by an analyzer."""
39
+
40
+ domain: DomainType
41
+ domain_score: float = Field(..., ge=0.0, le=1.0)
42
+ issues: List[AnalysisIssue] = Field(default_factory=list)
43
+ suggestions: List[str] = Field(default_factory=list)
44
+ highlights: Dict[str, float | str] = Field(default_factory=dict)
45
+
46
+
47
+ class ScoreBreakdown(BaseModel):
48
+ """Reward inputs and final normalized score."""
49
+
50
+ ml_score: float = Field(..., ge=0.0, le=1.0)
51
+ domain_score: float = Field(..., ge=0.0, le=1.0)
52
+ lint_score: float = Field(..., ge=0.0, le=1.0)
53
+ complexity_penalty: float = Field(..., ge=0.0, le=1.0)
54
+ reward: float = Field(..., ge=0.0, le=1.0)
55
+
56
+
57
+ class AnalyzeCodeResponse(BaseModel):
58
+ """Top-level structured output for API and UI consumers."""
59
+
60
+ detected_domain: DomainType
61
+ domain_confidences: Dict[str, float]
62
+ score_breakdown: ScoreBreakdown
63
+ static_analysis: StaticAnalysisSummary
64
+ domain_analysis: DomainAnalysis
65
+ improvement_plan: List[str] = Field(default_factory=list)
66
+ model_backend: str
67
+ model_id: str
68
+ summary: str
69
+ context_window: str = ""
70
+ analysis_time_ms: float = Field(..., ge=0.0)
services/__init__.py ADDED
@@ -0,0 +1,7 @@
 
 
 
 
 
 
 
 
1
+ """Service layer for orchestrating analysis, suggestions, and rewards."""
2
+
3
+ from .analysis_service import AnalysisService
4
+ from .reward_service import RewardService
5
+ from .suggestion_service import SuggestionService
6
+
7
+ __all__ = ["AnalysisService", "RewardService", "SuggestionService"]
services/analysis_service.py ADDED
@@ -0,0 +1,123 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Orchestration layer for multi-domain code analysis."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import time
6
+ from typing import Any, Callable, Dict
7
+
8
+ from analyzers import analyze_data_science_code, analyze_dsa_code, analyze_ml_code, analyze_web_code
9
+ from models import PyTorchCodeAnalyzerModel
10
+ from schemas.request import AnalyzeCodeRequest
11
+ from schemas.response import AnalyzeCodeResponse, DomainAnalysis, StaticAnalysisSummary
12
+ from services.reward_service import RewardService
13
+ from services.suggestion_service import SuggestionService
14
+ from utils import estimate_complexity, parse_code_structure
15
+
16
+
17
+ def _lint_score(parsed: Dict[str, Any]) -> float:
18
+ """Convert structural smells into a normalized lint-style score."""
19
+
20
+ score = 1.0
21
+ if not parsed.get("syntax_valid", True):
22
+ score -= 0.45
23
+ score -= min(parsed.get("long_lines", 0), 5) * 0.03
24
+ if parsed.get("tabs_used"):
25
+ score -= 0.1
26
+ if parsed.get("trailing_whitespace_lines"):
27
+ score -= 0.05
28
+ if parsed.get("docstring_ratio", 0.0) == 0.0 and parsed.get("function_names"):
29
+ score -= 0.08
30
+ return round(max(0.0, min(1.0, score)), 4)
31
+
32
+
33
+ class AnalysisService:
34
+ """End-to-end analysis pipeline shared by API and UI."""
35
+
36
+ def __init__(self) -> None:
37
+ self.model = PyTorchCodeAnalyzerModel()
38
+ self.reward_service = RewardService()
39
+ self.suggestion_service = SuggestionService()
40
+ self._analyzers: Dict[str, Callable[[str, Dict[str, Any], Dict[str, Any]], DomainAnalysis]] = {
41
+ "dsa": analyze_dsa_code,
42
+ "data_science": analyze_data_science_code,
43
+ "ml_dl": analyze_ml_code,
44
+ "web": analyze_web_code,
45
+ }
46
+
47
+ def _heuristic_domain_scores(self, parsed: Dict[str, Any], code: str) -> Dict[str, float]:
48
+ """Derive domain priors from imports and syntax-level hints."""
49
+
50
+ scores = {
51
+ "dsa": 0.2 + (0.15 if parsed.get("uses_recursion") else 0.0) + (0.15 if parsed.get("max_loop_depth", 0) >= 1 else 0.0),
52
+ "data_science": 0.2 + (0.35 if parsed.get("uses_pandas") or parsed.get("uses_numpy") else 0.0),
53
+ "ml_dl": 0.2 + (0.35 if parsed.get("uses_torch") or parsed.get("uses_sklearn") else 0.0),
54
+ "web": 0.2 + (0.35 if parsed.get("uses_fastapi") or parsed.get("uses_flask") else 0.0) + (0.1 if parsed.get("route_decorators") else 0.0),
55
+ "general": 0.2,
56
+ }
57
+ if "fastapi" in code.lower():
58
+ scores["web"] += 0.1
59
+ if "pandas" in code.lower() or "numpy" in code.lower():
60
+ scores["data_science"] += 0.1
61
+ if "torch" in code.lower():
62
+ scores["ml_dl"] += 0.1
63
+ if "while" in code or "for" in code:
64
+ scores["dsa"] += 0.05
65
+ return {key: round(min(value, 0.99), 4) for key, value in scores.items()}
66
+
67
+ def analyze(self, request: AnalyzeCodeRequest) -> AnalyzeCodeResponse:
68
+ """Run the complete multi-domain analysis pipeline."""
69
+
70
+ started = time.perf_counter()
71
+ parsed = parse_code_structure(request.code)
72
+ complexity = estimate_complexity(parsed, request.code)
73
+ model_prediction = self.model.predict(request.code, request.context_window, parsed)
74
+ heuristic_scores = self._heuristic_domain_scores(parsed, request.code)
75
+
76
+ combined_scores = {}
77
+ for domain, heuristic_score in heuristic_scores.items():
78
+ model_score = float(model_prediction["domain_scores"].get(domain, 0.2))
79
+ combined_scores[domain] = round((0.6 * model_score) + (0.4 * heuristic_score), 4)
80
+
81
+ detected_domain = request.domain_hint if request.domain_hint != "auto" else max(combined_scores, key=combined_scores.get)
82
+ analyzer = self._analyzers.get(detected_domain, analyze_dsa_code if detected_domain == "dsa" else analyze_web_code)
83
+ domain_analysis = analyzer(request.code, parsed, complexity) if detected_domain in self._analyzers else DomainAnalysis(domain="general", domain_score=0.6, issues=[], suggestions=["Add stronger domain-specific context for deeper analysis."], highlights={})
84
+
85
+ lint_score = _lint_score(parsed)
86
+ score_breakdown = self.reward_service.compute(
87
+ ml_score=float(model_prediction["ml_quality_score"]),
88
+ domain_score=domain_analysis.domain_score,
89
+ lint_score=lint_score,
90
+ complexity_penalty=float(complexity["complexity_penalty"]),
91
+ )
92
+ static_analysis = StaticAnalysisSummary(
93
+ syntax_valid=bool(parsed["syntax_valid"]),
94
+ syntax_error=str(parsed["syntax_error"]),
95
+ cyclomatic_complexity=int(complexity["cyclomatic_complexity"]),
96
+ line_count=int(parsed["line_count"]),
97
+ max_loop_depth=int(parsed["max_loop_depth"]),
98
+ time_complexity=str(complexity["time_complexity"]),
99
+ space_complexity=str(complexity["space_complexity"]),
100
+ detected_imports=list(parsed["imports"]),
101
+ code_smells=list(parsed["code_smells"]),
102
+ )
103
+ improvement_plan = self.suggestion_service.build_improvement_plan(
104
+ domain_analysis=domain_analysis,
105
+ static_analysis=static_analysis,
106
+ )
107
+ summary = (
108
+ f"Detected `{detected_domain}` code with a model score of {score_breakdown.ml_score:.0%}, "
109
+ f"domain score {score_breakdown.domain_score:.0%}, and final reward {score_breakdown.reward:.0%}."
110
+ )
111
+ return AnalyzeCodeResponse(
112
+ detected_domain=detected_domain, # type: ignore[arg-type]
113
+ domain_confidences=combined_scores,
114
+ score_breakdown=score_breakdown,
115
+ static_analysis=static_analysis,
116
+ domain_analysis=domain_analysis,
117
+ improvement_plan=improvement_plan,
118
+ model_backend=str(model_prediction["backend_name"]),
119
+ model_id=str(model_prediction["model_id"]),
120
+ summary=summary,
121
+ context_window=request.context_window,
122
+ analysis_time_ms=round((time.perf_counter() - started) * 1000.0, 2),
123
+ )
services/reward_service.py ADDED
@@ -0,0 +1,27 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Reward shaping logic for RL-ready code analysis scores."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from schemas.response import ScoreBreakdown
6
+
7
+
8
+ class RewardService:
9
+ """Compute reward scores from model, domain, lint, and complexity signals."""
10
+
11
+ def compute(self, *, ml_score: float, domain_score: float, lint_score: float, complexity_penalty: float) -> ScoreBreakdown:
12
+ """Apply the weighted reward formula and clamp the result."""
13
+
14
+ reward = max(
15
+ 0.0,
16
+ min(
17
+ 1.0,
18
+ (0.4 * ml_score) + (0.2 * domain_score) + (0.2 * lint_score) - (0.2 * complexity_penalty),
19
+ ),
20
+ )
21
+ return ScoreBreakdown(
22
+ ml_score=round(ml_score, 4),
23
+ domain_score=round(domain_score, 4),
24
+ lint_score=round(lint_score, 4),
25
+ complexity_penalty=round(complexity_penalty, 4),
26
+ reward=round(reward, 4),
27
+ )
services/suggestion_service.py ADDED
@@ -0,0 +1,28 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Suggestion and improvement-plan generation for analyzed code."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from schemas.response import DomainAnalysis, StaticAnalysisSummary
6
+
7
+
8
+ class SuggestionService:
9
+ """Build high-signal improvement steps from analysis output."""
10
+
11
+ def build_improvement_plan(self, *, domain_analysis: DomainAnalysis, static_analysis: StaticAnalysisSummary) -> list[str]:
12
+ """Return a compact three-step plan optimized for developer action."""
13
+
14
+ primary_issue = (
15
+ domain_analysis.issues[0].description
16
+ if domain_analysis.issues
17
+ else "Stabilize correctness first and keep the public behavior explicit."
18
+ )
19
+
20
+ step_one = f"Step 1 - Correctness and safety: {primary_issue}"
21
+ step_two = "Step 2 - Edge cases: test empty inputs, boundary values, malformed payloads, and failure-mode behavior explicitly."
22
+ step_three = "Step 3 - Scalability: reduce repeated scans, lower cyclomatic complexity, and benchmark the path on realistic input sizes."
23
+
24
+ if domain_analysis.suggestions:
25
+ step_three = f"{step_three} Priority hint: {domain_analysis.suggestions[0]}"
26
+ if not static_analysis.syntax_valid:
27
+ step_one = f"Step 1 - Correctness and safety: fix the syntax error first ({static_analysis.syntax_error})."
28
+ return [step_one, step_two, step_three]
utils/__init__.py ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ """Utility helpers for AST parsing and complexity scoring."""
2
+
3
+ from .ast_parser import parse_code_structure
4
+ from .complexity import estimate_complexity
5
+
6
+ __all__ = ["parse_code_structure", "estimate_complexity"]
utils/ast_parser.py ADDED
@@ -0,0 +1,144 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Static parsing helpers for multi-domain Python code analysis."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import ast
6
+ from typing import Any, Dict, List
7
+
8
+
9
+ class _LoopDepthVisitor(ast.NodeVisitor):
10
+ """Collect loop nesting depth for a parsed Python module."""
11
+
12
+ def __init__(self) -> None:
13
+ self.depth = 0
14
+ self.max_depth = 0
15
+
16
+ def _visit_loop(self, node: ast.AST) -> None:
17
+ self.depth += 1
18
+ self.max_depth = max(self.max_depth, self.depth)
19
+ self.generic_visit(node)
20
+ self.depth -= 1
21
+
22
+ def visit_For(self, node: ast.For) -> None: # noqa: N802
23
+ self._visit_loop(node)
24
+
25
+ def visit_While(self, node: ast.While) -> None: # noqa: N802
26
+ self._visit_loop(node)
27
+
28
+ def visit_comprehension(self, node: ast.comprehension) -> None: # noqa: N802
29
+ self._visit_loop(node)
30
+
31
+
32
+ def parse_code_structure(code: str) -> Dict[str, Any]:
33
+ """Parse Python code into reusable structural signals."""
34
+
35
+ summary: Dict[str, Any] = {
36
+ "syntax_valid": True,
37
+ "syntax_error": "",
38
+ "imports": [],
39
+ "function_names": [],
40
+ "class_names": [],
41
+ "loop_count": 0,
42
+ "branch_count": 0,
43
+ "max_loop_depth": 0,
44
+ "line_count": len(code.splitlines()),
45
+ "long_lines": 0,
46
+ "tabs_used": "\t" in code,
47
+ "trailing_whitespace_lines": 0,
48
+ "uses_numpy": False,
49
+ "uses_pandas": False,
50
+ "uses_torch": False,
51
+ "uses_sklearn": False,
52
+ "uses_fastapi": False,
53
+ "uses_flask": False,
54
+ "uses_pydantic": False,
55
+ "uses_recursion": False,
56
+ "calls_eval": False,
57
+ "calls_no_grad": False,
58
+ "calls_backward": False,
59
+ "calls_optimizer_step": False,
60
+ "route_decorators": [],
61
+ "docstring_ratio": 0.0,
62
+ "code_smells": [],
63
+ }
64
+
65
+ lines = code.splitlines()
66
+ summary["long_lines"] = sum(1 for line in lines if len(line) > 88)
67
+ summary["trailing_whitespace_lines"] = sum(1 for line in lines if line.rstrip() != line)
68
+
69
+ try:
70
+ tree = ast.parse(code)
71
+ except SyntaxError as exc:
72
+ summary["syntax_valid"] = False
73
+ summary["syntax_error"] = f"{exc.msg} (line {exc.lineno})"
74
+ summary["code_smells"].append("Code does not parse.")
75
+ return summary
76
+
77
+ visitor = _LoopDepthVisitor()
78
+ visitor.visit(tree)
79
+ summary["max_loop_depth"] = visitor.max_depth
80
+
81
+ functions = [node for node in tree.body if isinstance(node, ast.FunctionDef)]
82
+ summary["function_names"] = [node.name for node in functions]
83
+ summary["class_names"] = [node.name for node in tree.body if isinstance(node, ast.ClassDef)]
84
+ summary["docstring_ratio"] = (
85
+ sum(1 for node in functions if ast.get_docstring(node)) / len(functions)
86
+ if functions
87
+ else 0.0
88
+ )
89
+
90
+ imports: List[str] = []
91
+ for node in ast.walk(tree):
92
+ if isinstance(node, ast.Import):
93
+ imports.extend(alias.name.split(".")[0] for alias in node.names)
94
+ elif isinstance(node, ast.ImportFrom) and node.module:
95
+ imports.append(node.module.split(".")[0])
96
+ elif isinstance(node, (ast.For, ast.While, ast.comprehension)):
97
+ summary["loop_count"] += 1
98
+ elif isinstance(node, (ast.If, ast.Try, ast.Match)):
99
+ summary["branch_count"] += 1
100
+ elif isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute):
101
+ attr = node.func.attr
102
+ if attr == "eval":
103
+ summary["calls_eval"] = True
104
+ elif attr == "backward":
105
+ summary["calls_backward"] = True
106
+ elif attr == "step":
107
+ summary["calls_optimizer_step"] = True
108
+ elif isinstance(node, ast.Call) and isinstance(node.func, ast.Name) and node.func.id == "print":
109
+ summary["code_smells"].append("Debug print statements are present.")
110
+ elif isinstance(node, ast.With):
111
+ if any(isinstance(item.context_expr, ast.Call) and isinstance(item.context_expr.func, ast.Attribute) and item.context_expr.func.attr == "no_grad" for item in node.items):
112
+ summary["calls_no_grad"] = True
113
+
114
+ import_set = sorted(set(imports))
115
+ summary["imports"] = import_set
116
+ summary["uses_numpy"] = "numpy" in import_set or "np" in code
117
+ summary["uses_pandas"] = "pandas" in import_set or "pd" in code
118
+ summary["uses_torch"] = "torch" in import_set
119
+ summary["uses_sklearn"] = "sklearn" in import_set
120
+ summary["uses_fastapi"] = "fastapi" in import_set
121
+ summary["uses_flask"] = "flask" in import_set
122
+ summary["uses_pydantic"] = "pydantic" in import_set or "BaseModel" in code
123
+
124
+ for node in functions:
125
+ for child in ast.walk(node):
126
+ if isinstance(child, ast.Call) and isinstance(child.func, ast.Name) and child.func.id == node.name:
127
+ summary["uses_recursion"] = True
128
+
129
+ for node in ast.walk(tree):
130
+ if isinstance(node, ast.FunctionDef):
131
+ for decorator in node.decorator_list:
132
+ if isinstance(decorator, ast.Call) and isinstance(decorator.func, ast.Attribute):
133
+ summary["route_decorators"].append(decorator.func.attr)
134
+ elif isinstance(decorator, ast.Attribute):
135
+ summary["route_decorators"].append(decorator.attr)
136
+
137
+ if summary["long_lines"]:
138
+ summary["code_smells"].append("Long lines reduce readability.")
139
+ if summary["tabs_used"]:
140
+ summary["code_smells"].append("Tabs detected; prefer spaces for consistency.")
141
+ if summary["trailing_whitespace_lines"]:
142
+ summary["code_smells"].append("Trailing whitespace found.")
143
+
144
+ return summary
utils/complexity.py ADDED
@@ -0,0 +1,37 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Complexity heuristics for DSA-style and general Python code."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from typing import Any, Dict
6
+
7
+
8
+ def estimate_complexity(parsed: Dict[str, Any], code: str) -> Dict[str, Any]:
9
+ """Estimate cyclomatic complexity and rough Big-O heuristics."""
10
+
11
+ cyclomatic = 1 + int(parsed.get("branch_count", 0))
12
+ loop_depth = int(parsed.get("max_loop_depth", 0))
13
+ uses_recursion = bool(parsed.get("uses_recursion", False))
14
+
15
+ if loop_depth >= 3:
16
+ time_complexity = "O(n^3)"
17
+ elif loop_depth == 2:
18
+ time_complexity = "O(n^2)"
19
+ elif "sorted(" in code or ".sort(" in code:
20
+ time_complexity = "O(n log n)"
21
+ elif loop_depth == 1 or uses_recursion:
22
+ time_complexity = "O(n)"
23
+ else:
24
+ time_complexity = "O(1)"
25
+
26
+ if "append(" in code or "list(" in code or "dict(" in code or "set(" in code:
27
+ space_complexity = "O(n)"
28
+ else:
29
+ space_complexity = "O(1)"
30
+
31
+ complexity_penalty = min(0.99, 0.08 + (cyclomatic * 0.04) + (loop_depth * 0.12))
32
+ return {
33
+ "cyclomatic_complexity": cyclomatic,
34
+ "time_complexity": time_complexity,
35
+ "space_complexity": space_complexity,
36
+ "complexity_penalty": round(complexity_penalty, 4),
37
+ }