aaditkumar commited on
Commit
5d7e1ed
·
verified ·
1 Parent(s): 48ab4ef

Upload 28 files

Browse files
app/__init__.py ADDED
@@ -0,0 +1,19 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ J.A.R.V.I.S APPLICATION PACKAGE
3
+ ===============================
4
+
5
+ This directory is the main Python package for the J.A.R.V.I.S backend.
6
+ The presence of __init__.py makes Python treat 'app' as a package, so you can:
7
+
8
+ from app.main import app
9
+ from app.models import ChatRequest
10
+ from app.services.chat_service import ChatService
11
+
12
+ FILE STRUCTURE:
13
+ app/
14
+ __init__.py - This file; marks 'app' as a package.
15
+ main.py - FastAPI app and all HTTP endpoints (/chat, /chat/realtime, /health, etc.).
16
+ models.py - Pydantic models for API requests, responses, and internal chat storage.
17
+ services/ - Business logic: chat sessions, Groq LLM, realtime (Tavily + Groq), vector store.
18
+ utils/ - Helpers: retry with backoff, current date/time for the LLM prompt.
19
+ """
app/__pycache__/__init__.cpython-312.pyc ADDED
Binary file (928 Bytes). View file
 
app/__pycache__/generate_thinking_audio.cpython-312.pyc ADDED
Binary file (3.27 kB). View file
 
app/__pycache__/main.cpython-312.pyc ADDED
Binary file (30.9 kB). View file
 
app/__pycache__/models.cpython-312.pyc ADDED
Binary file (1.53 kB). View file
 
app/generate_thinking_audio.py ADDED
@@ -0,0 +1,66 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import sys
3
+ from pathlib import Path
4
+
5
+ PROJECT_ROOT = Path(__file__).resolve().parent.parent
6
+ AUDIO_DIR = PROJECT_ROOT / "frontend" / "audio"
7
+
8
+ STARTER_PHRASES = [
9
+ ("starter_1", "One moment please."),
10
+ ("starter_2", "Sure, one moment."),
11
+ ("starter_3", "Got it, hold on."),
12
+ ("starter_4", "On it right now."),
13
+ ("starter_5", "Alright, give me a sec."),
14
+ ("starter_6", "Right, one moment."),
15
+ ("starter_7", "Okay, hold on."),
16
+ ("starter_8", "One second please."),
17
+ ("starter_9", "Give me a moment."),
18
+ ("starter_10", "Just a moment please."),
19
+ ]
20
+
21
+ PHRASES = STARTER_PHRASES
22
+ VOICE = "en-GB-RyanNeural"
23
+ RATE = "+15%"
24
+
25
+ async def generate_one(name: str, text: str) -> bool:
26
+ try:
27
+ import edge_tts
28
+ except ImportError:
29
+ return False
30
+ path = AUDIO_DIR / f"{name}.mp3"
31
+ try:
32
+ communicate = edge_tts.Communicate(text, VOICE, rate=RATE)
33
+ await communicate.save(str(path))
34
+ print(f" [OK] {name}.mp3")
35
+ return True
36
+ except Exception as e:
37
+ print(f" [FAIL] {name}.mp3: {e}")
38
+ return False
39
+
40
+ async def main():
41
+ try:
42
+ import edge_tts
43
+ except ImportError:
44
+ print("edge-tts not installed. Run: pip install edge-tts")
45
+ return 1
46
+ AUDIO_DIR.mkdir(parents=True, exist_ok=True)
47
+ for f in AUDIO_DIR.glob("followup_*.mp3"):
48
+ try:
49
+ f.unlink()
50
+ print(f" [REMOVED] {f.name}")
51
+ except OSError:
52
+ pass
53
+ print(f"Generating thinking audio in {AUDIO_DIR}...")
54
+ success = 0
55
+ for name, text in PHRASES:
56
+ if await generate_one(name, text):
57
+ success += 1
58
+ print(f"Done: {success}/{len(PHRASES)} files.")
59
+ return 0 if success == len(PHRASES) else 1
60
+
61
+ if __name__ == "__main__":
62
+ try:
63
+ exit_code = asyncio.run(main())
64
+ except KeyboardInterrupt:
65
+ exit_code = 130
66
+ sys.exit(exit_code)
app/main.py ADDED
@@ -0,0 +1,523 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pathlib import Path
2
+ from fastapi import FastAPI, HTTPException
3
+ from fastapi.middleware.cors import CORSMiddleware
4
+ from fastapi.responses import StreamingResponse, RedirectResponse
5
+ from fastapi.staticfiles import StaticFiles
6
+ from starlette.middleware.base import BaseHTTPMiddleware
7
+ from starlette.requests import Request
8
+ from contextlib import asynccontextmanager
9
+ import uvicorn
10
+ import logging
11
+ import json
12
+ import time
13
+ import re
14
+ import base64
15
+ import asyncio
16
+ from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
17
+ import edge_tts
18
+ from app.models import ChatRequest, ChatResponse, TTSRequest
19
+
20
+ RATE_LIMIT_MESSAGE = (
21
+ "You've reached your daily API limit for this assistant. "
22
+ "Your credits will reset in a few hours, or you can upgrade your plan for more. "
23
+ "Please try again later."
24
+ )
25
+
26
+ def _is_rate_limit_error(exc: Exception) -> bool:
27
+ msg = str(exc).lower()
28
+ return "429" in str(exc) or "rate limit" in msg or "tokens per day" in msg
29
+
30
+ from app.services.vector_store import VectorStoreService
31
+ from app.services.groq_service import GroqService, AllGroqApisFailedError
32
+ from app.services.realtime_service import RealtimeGroqService
33
+ from app.services.chat_service import ChatService
34
+ from app.services.brain_service import BrainService
35
+ from config import (
36
+ VECTOR_STORE_DIR, GROQ_API_KEYS, GROQ_MODEL, TAVILY_API_KEY,
37
+ EMBEDDING_MODEL, CHUNK_SIZE, CHUNK_OVERLAP, MAX_CHAT_HISTORY_TURNS,
38
+ ASSISTANT_NAME, TTS_VOICE, TTS_RATE,
39
+ )
40
+
41
+ logging.basicConfig(
42
+ level=logging.INFO,
43
+ format='%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s',
44
+ datefmt='%Y-%m-%d %H:%M:%S'
45
+ )
46
+ logger = logging.getLogger("J.A.R.V.I.S")
47
+
48
+ vector_store_service: VectorStoreService = None
49
+ groq_service: GroqService = None
50
+ realtime_service: RealtimeGroqService = None
51
+ brain_service: BrainService = None
52
+ chat_service: ChatService = None
53
+
54
+ def print_title():
55
+ """Print the J.A.R.V.I.S ASCII art title."""
56
+ title = r"""
57
+ ╔══════════════════════════════════════════════════════════╗
58
+ ║ ║
59
+ ║ ██╗ █████╗ ██████╗ ██╗ ██╗██╗███████╗ ║
60
+ ║ ██║██╔══██╗██╔══██╗██║ ██║██║██╔════╝ ║
61
+ ║ ██║███████║██████╔╝██║ ██║██║███████╗ ║
62
+ ║ ██ ██║██╔══██║██╔══██╗╚██╗ ██╔╝██║╚════██║ ║
63
+ ║ ╚█████╔╝██║ ██║██║ ██║ ╚████╔╝ ██║███████║ ║
64
+ ║ ╚════╝ ╚═╝ ╚═╝╚═╝ ╚═╝ ╚═══╝ ╚═╝╚══════╝ ║
65
+ ║ ║
66
+ ║ Just A Rather Very Intelligent System ║
67
+ ║ ║
68
+ ╚══════════════════════════════════════════════════════════╝
69
+ """
70
+
71
+ print(title)
72
+
73
+ @asynccontextmanager
74
+ async def lifespan(app: FastAPI):
75
+ global vector_store_service, groq_service, realtime_service, brain_service, chat_service
76
+
77
+ print_title()
78
+ logger.info("=" * 60)
79
+ logger.info("J.A.R.V.I.S - Starting Up...")
80
+ logger.info("-" * 60)
81
+ logger.info("[CONFIG] Assistant name: %s", ASSISTANT_NAME)
82
+ logger.info("[CONFIG] Groq model: %s", GROQ_MODEL)
83
+ logger.info("[CONFIG] Groq API keys loaded: %d", len(GROQ_API_KEYS))
84
+ logger.info("[CONFIG] Tavily API key: %s", "configured" if TAVILY_API_KEY else "NOT SET")
85
+ logger.info("[CONFIG] Embedding model: %s", EMBEDDING_MODEL)
86
+ logger.info("[CONFIG] Chunk size: %d | Overlap: %d | Max history turns: %d",
87
+ CHUNK_SIZE, CHUNK_OVERLAP, MAX_CHAT_HISTORY_TURNS)
88
+
89
+ try:
90
+ logger.info("Initializing vector store service...")
91
+ t0 = time.perf_counter()
92
+ vector_store_service = VectorStoreService()
93
+ vector_store_service.create_vector_store()
94
+ logger.info("[TIMING] startup_vector_store: %.3fs", time.perf_counter() - t0)
95
+
96
+ logger.info("Initializing Groq service (general queries)...")
97
+ groq_service = GroqService(vector_store_service)
98
+ logger.info("Groq service initialized successfully")
99
+
100
+ logger.info("Initializing Realtime Groq service (with Tavily search)...")
101
+ realtime_service = RealtimeGroqService(vector_store_service)
102
+ logger.info("Realtime Groq service initialized successfully")
103
+
104
+ logger.info("Initializing Brain service (Groq query classification)...")
105
+ brain_service = BrainService()
106
+ logger.info("Brain service initialized successfully")
107
+
108
+ logger.info("Initializing chat service...")
109
+ chat_service = ChatService(groq_service, realtime_service, brain_service)
110
+ logger.info("Chat service initialized successfully")
111
+
112
+ logger.info("=" * 60)
113
+ logger.info("Service Status:")
114
+ logger.info(" - Vector Store: Ready")
115
+ logger.info(" - Groq AI (General): Ready")
116
+ logger.info(" - Groq AI (Realtime): Ready")
117
+ logger.info(" - Brain (Groq): Ready")
118
+ logger.info(" - Chat Service: Ready")
119
+ logger.info("=" * 60)
120
+ logger.info("J.A.R.V.I.S is online and ready!")
121
+ logger.info("API: http://localhost:8000")
122
+ logger.info("Frontend: http://localhost:8000/")
123
+ logger.info("-" * 60)
124
+
125
+ yield
126
+
127
+ logger.info("\nShutting down J.A.R.V.I.S...")
128
+ _tts_pool.shutdown(wait=True)
129
+ if chat_service:
130
+ for session_id in list(chat_service.sessions.keys()):
131
+ chat_service.save_chat_session(session_id)
132
+ logger.info("All sessions saved. Goodbye!")
133
+
134
+ except Exception as e:
135
+ logger.error(f"Fatal error during startup: {e}", exc_info=True)
136
+ raise
137
+
138
+ app = FastAPI(
139
+ title="J.A.R.V.I.S API",
140
+ description="Just A Rather Very Intelligent System",
141
+ lifespan=lifespan,
142
+ docs_url=None,
143
+ redoc_url=None,
144
+ openapi_url=None
145
+ )
146
+
147
+ app.add_middleware(
148
+ CORSMiddleware,
149
+ allow_origins=["*"],
150
+ allow_credentials=True,
151
+ allow_methods=["*"],
152
+ allow_headers=["*"],
153
+ )
154
+
155
+ class TimingMiddleware(BaseHTTPMiddleware):
156
+ async def dispatch(self, request: Request, call_next):
157
+ t0 = time.perf_counter()
158
+ response = await call_next(request)
159
+ elapsed = time.perf_counter() - t0
160
+ path = request.url.path
161
+ logger.info("[REQUEST] %s %s -> %s (%.3fs)", request.method, path, response.status_code, elapsed)
162
+ return response
163
+
164
+ app.add_middleware(TimingMiddleware)
165
+
166
+ @app.get("/api")
167
+ async def api_info():
168
+ return {
169
+ "message": "J.A.R.V.I.S API",
170
+ "endpoints": {
171
+ "/chat": "General chat (non-streaming)",
172
+ "/chat/stream": "General chat (streaming chunks)",
173
+ "/chat/realtime": "Realtime chat (non-streaming)",
174
+ "/chat/realtime/stream": "Realtime chat (streaming chunks)",
175
+ "/chat/jarvis/stream": "Jarvis unified route (brain classifies, streams)",
176
+ "/chat/history/{session_id}": "Get chat history",
177
+ "/health": "System health check",
178
+ "/tts": "Text-to-speech (POST text, returns streamed MP3)"
179
+ }
180
+ }
181
+
182
+ @app.get("/health")
183
+ async def health():
184
+ try:
185
+ return {
186
+ "status": "healthy",
187
+ "vector_store": vector_store_service is not None,
188
+ "groq_service": groq_service is not None,
189
+ "realtime_service": realtime_service is not None,
190
+ "brain_service": brain_service is not None,
191
+ "chat_service": chat_service is not None
192
+ }
193
+ except Exception as e:
194
+ logger.warning("[API /health] Error: %s", e)
195
+ return {"status": "degraded", "error": str(e)}
196
+
197
+ @app.post("/chat", response_model=ChatResponse)
198
+ async def chat(request: ChatRequest):
199
+ if not chat_service:
200
+ raise HTTPException(status_code=503, detail="Chat service not initialized")
201
+
202
+ logger.info("[API /chat] Incoming | session_id=%s | message_len=%d | message=%.100s",
203
+ request.session_id or "new", len(request.message), request.message)
204
+
205
+ try:
206
+ session_id = chat_service.get_or_create_session(request.session_id)
207
+ response_text = chat_service.process_message(session_id, request.message)
208
+ chat_service.save_chat_session(session_id)
209
+ logger.info("[API /chat] Done | session_id=%s | response_len=%d", session_id[:12], len(response_text))
210
+ return ChatResponse(response=response_text, session_id=session_id)
211
+ except ValueError as e:
212
+ logger.warning("[API /chat] Invalid session_id: %s", e)
213
+ raise HTTPException(status_code=400, detail=str(e))
214
+ except AllGroqApisFailedError as e:
215
+ logger.error("[API /chat] All Groq APIs failed: %s", e)
216
+ raise HTTPException(status_code=503, detail=str(e))
217
+ except Exception as e:
218
+ if _is_rate_limit_error(e):
219
+ logger.warning("[API /chat] Rate limit hit: %s", e)
220
+ raise HTTPException(status_code=429, detail=RATE_LIMIT_MESSAGE)
221
+ logger.error("[API /chat] Error: %s", e, exc_info=True)
222
+ raise HTTPException(status_code=500, detail=f"Error processing chat: {str(e)}")
223
+
224
+ _SPLIT_RE = re.compile(r"(?<=[\.!?,;:])\s+")
225
+ _MIN_WORDS_FIRST = 2
226
+ _MIN_WORDS = 3
227
+ _MERGE_IF_WORDS = 2
228
+
229
+ def _split_sentences(buf: str):
230
+ parts = _SPLIT_RE.split(buf)
231
+ if len(parts) <= 1:
232
+ return [], buf
233
+ raw = [p.strip() for p in parts[:-1] if p.strip()]
234
+ sentences, pending = [], ""
235
+ for s in raw:
236
+ if pending:
237
+ s = (pending + " " + s).strip()
238
+ pending = ""
239
+ min_req = _MIN_WORDS_FIRST if not sentences else _MIN_WORDS
240
+ if len(s.split()) < min_req:
241
+ pending = s
242
+ continue
243
+ sentences.append(s)
244
+ remaining = (pending + " " + parts[-1].strip()).strip() if pending else parts[-1].strip()
245
+ return sentences, remaining
246
+
247
+ def _merge_short(sentences):
248
+ if not sentences:
249
+ return []
250
+ merged, i = [], 0
251
+ while i < len(sentences):
252
+ cur = sentences[i]
253
+ j = i + 1
254
+ while j < len(sentences) and len(sentences[j].split()) <= _MERGE_IF_WORDS:
255
+ cur = (cur + " " + sentences[j]).strip()
256
+ j += 1
257
+ merged.append(cur)
258
+ i = j
259
+ return merged
260
+
261
+ def _generate_tts_sync(text: str, voice: str, rate: str) -> bytes:
262
+ async def _inner():
263
+ communicate = edge_tts.Communicate(text=text, voice=voice, rate=rate)
264
+ parts = []
265
+ async for chunk in communicate.stream():
266
+ if chunk["type"] == "audio":
267
+ parts.append(chunk["data"])
268
+ return b"".join(parts)
269
+ return asyncio.run(_inner())
270
+
271
+ _tts_pool = ThreadPoolExecutor(max_workers=4)
272
+
273
+ def _stream_generator(session_id: str, chunk_iter, is_realtime: bool, tts_enabled: bool = False):
274
+ yield f"data: {json.dumps({'session_id': session_id, 'chunk': '', 'done': False})}\n\n"
275
+
276
+ buffer = ""
277
+ held = None
278
+ is_first = True
279
+ audio_queue = []
280
+
281
+ def _submit(text):
282
+ if not text or not text.strip():
283
+ return
284
+ audio_queue.append((_tts_pool.submit(_generate_tts_sync, text, TTS_VOICE, TTS_RATE), text))
285
+
286
+ def _drain_ready():
287
+ events = []
288
+ while audio_queue and audio_queue[0][0].done():
289
+ fut, sent = audio_queue.pop(0)
290
+ try:
291
+ audio = fut.result()
292
+ b64 = base64.b64encode(audio).decode("ascii")
293
+ events.append(f"data: {json.dumps({'audio': b64, 'sentence': sent})}\n\n")
294
+ except Exception as exc:
295
+ logger.warning("[TTS-INLINE] Failed for '%s': %s", sent[:40], exc)
296
+ return events
297
+
298
+ try:
299
+ for chunk in chunk_iter:
300
+ if isinstance(chunk, dict) and "_activity" in chunk:
301
+ yield f"data: {json.dumps({'activity': chunk['_activity']})}\n\n"
302
+ continue
303
+ if isinstance(chunk, dict) and "_search_results" in chunk:
304
+ yield f"data: {json.dumps({'search_results': chunk['_search_results']})}\n\n"
305
+ continue
306
+ if not chunk:
307
+ continue
308
+
309
+ yield f"data: {json.dumps({'chunk': chunk, 'done': False})}\n\n"
310
+
311
+ if not tts_enabled:
312
+ continue
313
+
314
+ for ev in _drain_ready():
315
+ yield ev
316
+
317
+ buffer += chunk
318
+ sentences, buffer = _split_sentences(buffer)
319
+ sentences = _merge_short(sentences)
320
+
321
+ if held and sentences and len(sentences[0].split()) <= _MERGE_IF_WORDS:
322
+ held = (held + " " + sentences[0]).strip()
323
+ sentences = sentences[1:]
324
+
325
+ for i, sent in enumerate(sentences):
326
+ min_w = _MIN_WORDS_FIRST if is_first else _MIN_WORDS
327
+ if len(sent.split()) < min_w:
328
+ continue
329
+ is_last = (i == len(sentences) - 1)
330
+ if held:
331
+ _submit(held)
332
+ held = None
333
+ is_first = False
334
+ if is_last:
335
+ held = sent
336
+ else:
337
+ _submit(sent)
338
+ is_first = False
339
+
340
+ except Exception as e:
341
+ for fut, _ in audio_queue:
342
+ fut.cancel()
343
+ yield f"data: {json.dumps({'chunk': '', 'done': True, 'error': str(e)})}\n\n"
344
+ return
345
+
346
+ if tts_enabled:
347
+ remaining = buffer.strip()
348
+ if held:
349
+ if remaining and len(remaining.split()) <= _MERGE_IF_WORDS:
350
+ _submit((held + " " + remaining).strip())
351
+ else:
352
+ _submit(held)
353
+ if remaining:
354
+ _submit(remaining)
355
+ elif remaining:
356
+ _submit(remaining)
357
+
358
+ for fut, sent in audio_queue:
359
+ try:
360
+ audio = fut.result(timeout=15)
361
+ b64 = base64.b64encode(audio).decode("ascii")
362
+ yield f"data: {json.dumps({'audio': b64, 'sentence': sent})}\n\n"
363
+ except FuturesTimeoutError:
364
+ logger.warning("[TTS-INLINE] Timeout for '%s' (15s)", (sent or "")[:40])
365
+ except Exception as exc:
366
+ logger.warning("[TTS-INLINE] Failed for '%s': %s", (sent or "")[:40], exc)
367
+
368
+ yield f"data: {json.dumps({'chunk': '', 'done': True, 'session_id': session_id})}\n\n"
369
+
370
+ @app.post("/chat/stream")
371
+ async def chat_stream(request: ChatRequest):
372
+ if not chat_service:
373
+ raise HTTPException(status_code=503, detail="Chat service not initialized")
374
+ logger.info("[API /chat/stream] Incoming | session_id=%s | message_len=%d | message=%.100s",
375
+ request.session_id or "new", len(request.message), request.message)
376
+
377
+ try:
378
+ session_id = chat_service.get_or_create_session(request.session_id)
379
+ chunk_iter = chat_service.process_message_stream(session_id, request.message)
380
+ return StreamingResponse(
381
+ _stream_generator(session_id, chunk_iter, is_realtime=False, tts_enabled=request.tts),
382
+ media_type="text/event-stream",
383
+ headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
384
+ )
385
+ except ValueError as e:
386
+ raise HTTPException(status_code=400, detail=str(e))
387
+ except AllGroqApisFailedError as e:
388
+ raise HTTPException(status_code=503, detail=str(e))
389
+ except Exception as e:
390
+ if _is_rate_limit_error(e):
391
+ raise HTTPException(status_code=429, detail=RATE_LIMIT_MESSAGE)
392
+ logger.error("[API /chat/stream] Error: %s", e, exc_info=True)
393
+ raise HTTPException(status_code=500, detail=str(e))
394
+
395
+ @app.post("/chat/realtime", response_model=ChatResponse)
396
+ async def chat_realtime(request: ChatRequest):
397
+ if not chat_service:
398
+ raise HTTPException(status_code=503, detail="Chat service not initialized")
399
+ if not realtime_service:
400
+ raise HTTPException(status_code=503, detail="Realtime service not initialized")
401
+
402
+ logger.info("[API /chat/realtime] Incoming | session_id=%s | message_len=%d | message=%.100s",
403
+ request.session_id or "new", len(request.message), request.message)
404
+ try:
405
+ session_id = chat_service.get_or_create_session(request.session_id)
406
+ response_text = chat_service.process_realtime_message(session_id, request.message)
407
+ chat_service.save_chat_session(session_id)
408
+ logger.info("[API /chat/realtime] Done | session_id=%s | response_len=%d", session_id[:12], len(response_text))
409
+ return ChatResponse(response=response_text, session_id=session_id)
410
+ except ValueError as e:
411
+ logger.warning("[API /chat/realtime] Invalid session_id: %s", e)
412
+ raise HTTPException(status_code=400, detail=str(e))
413
+ except AllGroqApisFailedError as e:
414
+ logger.error("[API /chat/realtime] All Groq APIs failed: %s", e)
415
+ raise HTTPException(status_code=503, detail=str(e))
416
+ except Exception as e:
417
+ if _is_rate_limit_error(e):
418
+ logger.warning("[API /chat/realtime] Rate limit hit: %s", e)
419
+ raise HTTPException(status_code=429, detail=RATE_LIMIT_MESSAGE)
420
+ logger.error("[API /chat/realtime] Error: %s", e, exc_info=True)
421
+ raise HTTPException(status_code=500, detail=f"Error processing chat: {str(e)}")
422
+
423
+ @app.post("/chat/realtime/stream")
424
+ async def chat_realtime_stream(request: ChatRequest):
425
+ if not chat_service or not realtime_service:
426
+ raise HTTPException(status_code=503, detail="Service not initialized")
427
+ logger.info("[API /chat/realtime/stream] Incoming | session_id=%s | message_len=%d | message=%.100s",
428
+ request.session_id or "new", len(request.message), request.message)
429
+ try:
430
+ session_id = chat_service.get_or_create_session(request.session_id)
431
+ chunk_iter = chat_service.process_realtime_message_stream(session_id, request.message)
432
+ return StreamingResponse(
433
+ _stream_generator(session_id, chunk_iter, is_realtime=True, tts_enabled=request.tts),
434
+ media_type="text/event-stream",
435
+ headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
436
+ )
437
+ except ValueError as e:
438
+ raise HTTPException(status_code=400, detail=str(e))
439
+ except AllGroqApisFailedError as e:
440
+ raise HTTPException(status_code=503, detail=str(e))
441
+ except Exception as e:
442
+ if _is_rate_limit_error(e):
443
+ raise HTTPException(status_code=429, detail=RATE_LIMIT_MESSAGE)
444
+ logger.error("[API /chat/realtime/stream] Error: %s", e, exc_info=True)
445
+ raise HTTPException(status_code=500, detail=str(e))
446
+
447
+ @app.post("/chat/jarvis/stream")
448
+ async def chat_jarvis_stream(request: ChatRequest):
449
+ if not chat_service:
450
+ raise HTTPException(status_code=503, detail="Service not initialized")
451
+ logger.info("[API /chat/jarvis/stream] Incoming | session_id=%s | message_len=%d | message=%.100s",
452
+ request.session_id or "new", len(request.message), request.message)
453
+ try:
454
+ session_id = chat_service.get_or_create_session(request.session_id)
455
+ chunk_iter = chat_service.process_jarvis_message_stream(session_id, request.message)
456
+ return StreamingResponse(
457
+ _stream_generator(session_id, chunk_iter, is_realtime=True, tts_enabled=request.tts),
458
+ media_type="text/event-stream",
459
+ headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
460
+ )
461
+ except ValueError as e:
462
+ raise HTTPException(status_code=400, detail=str(e))
463
+ except AllGroqApisFailedError as e:
464
+ raise HTTPException(status_code=503, detail=str(e))
465
+ except Exception as e:
466
+ if _is_rate_limit_error(e):
467
+ raise HTTPException(status_code=429, detail=RATE_LIMIT_MESSAGE)
468
+ logger.error("[API /chat/jarvis/stream] Error: %s", e, exc_info=True)
469
+ raise HTTPException(status_code=500, detail=str(e))
470
+
471
+ @app.get("/chat/history/{session_id}")
472
+ async def get_chat_history(session_id: str):
473
+ if not chat_service:
474
+ raise HTTPException(status_code=503, detail="Chat service not initialized")
475
+ if not chat_service.validate_session_id(session_id):
476
+ raise HTTPException(status_code=400, detail="Invalid session_id format")
477
+
478
+ try:
479
+ messages = chat_service.get_chat_history(session_id)
480
+ return {
481
+ "session_id": session_id,
482
+ "messages": [{"role": msg.role, "content": msg.content} for msg in messages]
483
+ }
484
+ except Exception as e:
485
+ logger.error(f"Error retrieving history: {e}", exc_info=True)
486
+ raise HTTPException(status_code=500, detail=f"Error retrieving history: {str(e)}")
487
+
488
+ @app.post("/tts")
489
+ async def text_to_speech(request: TTSRequest):
490
+ text = request.text.strip()
491
+ if not text:
492
+ raise HTTPException(status_code=400, detail="Text is required")
493
+
494
+ async def generate():
495
+ try:
496
+ communicate = edge_tts.Communicate(text=text, voice=TTS_VOICE, rate=TTS_RATE)
497
+ async for chunk in communicate.stream():
498
+ if chunk["type"] == "audio":
499
+ yield chunk["data"]
500
+ except Exception as e:
501
+ logger.error(f"[TTS] Error generating speech: %s", e)
502
+
503
+ return StreamingResponse(
504
+ generate(),
505
+ media_type="audio/mpeg",
506
+ headers={"Cache-Control": "no-cache"},
507
+ )
508
+
509
+ _frontend_dir = Path(__file__).resolve().parent.parent / "frontend"
510
+ if _frontend_dir.exists():
511
+ app.mount("/", StaticFiles(directory=str(_frontend_dir), html=True), name="frontend")
512
+
513
+ def run():
514
+ uvicorn.run(
515
+ "app.main:app",
516
+ host="0.0.0.0",
517
+ port=8000,
518
+ reload=True,
519
+ log_level="info"
520
+ )
521
+
522
+ if __name__ == "__main__":
523
+ run()
app/models.py ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pydantic import BaseModel, Field
2
+ from typing import List, Optional
3
+
4
+ class ChatMessage(BaseModel):
5
+ role: str
6
+ content: str
7
+
8
+ class ChatRequest(BaseModel):
9
+ message: str = Field(..., min_length=1, max_length=32_000)
10
+ session_id: Optional[str] = None
11
+ tts: bool = False
12
+
13
+ class ChatResponse(BaseModel):
14
+ response: str
15
+ session_id: str
16
+
17
+ class ChatHistory(BaseModel):
18
+ session_id: str
19
+ messages: List[ChatMessage]
20
+
21
+ class TTSRequest(BaseModel):
22
+ text: str = Field(..., min_length=1, max_length=5000)
app/services/__init__.py ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ SERVICES PACKAGE
3
+ ================
4
+
5
+ Business logic lives here. The API layer (app.main) calls these services;
6
+ they do not handle HTTP, only chat flow, LLM calls, and data.
7
+
8
+ MODULES:
9
+ chat_service - Sessions (get/create, load from disk), message list, format history for LLM, save to disk.
10
+ groq_service - General chat: retrieve context from vector store, build prompt, call Groq LLM.
11
+ realtime_service - Realtime chat: Tavily search first, then same as groq (inherits GroqService).
12
+ vector_store - Load learning_data + chats_data, chunk, embed, FAISS index; provide retriever for context.
13
+ """
app/services/__pycache__/__init__.cpython-312.pyc ADDED
Binary file (759 Bytes). View file
 
app/services/__pycache__/brain_service.cpython-312.pyc ADDED
Binary file (6.71 kB). View file
 
app/services/__pycache__/chat_service.cpython-312.pyc ADDED
Binary file (21.1 kB). View file
 
app/services/__pycache__/groq_service.cpython-312.pyc ADDED
Binary file (13.4 kB). View file
 
app/services/__pycache__/realtime_service.cpython-312.pyc ADDED
Binary file (14.4 kB). View file
 
app/services/__pycache__/vector_store.cpython-312.pyc ADDED
Binary file (7.32 kB). View file
 
app/services/brain_service.py ADDED
@@ -0,0 +1,105 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ import re
3
+ import time
4
+ from typing import List, Optional, Tuple, Literal
5
+
6
+ from config import GROQ_API_KEYS, GROQ_BRAIN_MODEL
7
+
8
+ logger = logging.getLogger("J.A.R.V.I.S")
9
+
10
+ QueryType = Literal["general", "realtime"]
11
+ MAX_CONTEXT_TURNS = 6
12
+ MAX_MESSAGE_PREVIEW = 500
13
+ REASONING_GENERAL = "Answerable from knowledge and context"
14
+ REASONING_REALTIME = "Needs live web search"
15
+ REASONING_DEFAULT = "Brain unavailable; defaulting to realtime"
16
+ REASONING_UNCLEAR = "Unclear; defaulting to realtime"
17
+
18
+ _BRAIN_SYSTEM_PROMPT = """You are a query classifier for an AI assistant. Your ONLY job is to decide whether a user's message needs LIVE WEB SEARCH or not.
19
+
20
+ Output EXACTLY one word: either "general" or "realtime".
21
+
22
+ - general: ONLY questions that are purely from static knowledge, learning data, or conversation. Examples: "Tell me a joke", "What did I ask you before?", "Open YouTube", "Write a poem about cats", "How do I improve my coding?", "What is the capital of France?", casual chit-chat. NO questions about people, current events, or things that could change.
23
+
24
+ - realtime: ALWAYS use realtime for:
25
+ * ANY question about a person (famous or not): "Who is Elon Musk?", "Tell me about [person]", "What is [name] known for?", "Who is that actor?" — the LLM has no real-time data; web search finds current info and may find info on lesser-known people.
26
+ * Anything that could have changed: news, weather, stock prices, sports scores, elections, "latest", "current", "today", "recent", "now".
27
+ * Factual lookups where real-time data would be better: events, companies, products, releases, versions.
28
+
29
+ STRONG RULE: If the question is about a person (who, what, tell me about, etc.) → ALWAYS "realtime". The LLM cannot know current facts; web search can.
30
+
31
+ When in doubt, prefer "realtime" — it's better to search when not needed than to miss current information.
32
+
33
+ Output ONLY the word. No explanation, no punctuation, no other text."""
34
+
35
+ class BrainService:
36
+ def __init__(self):
37
+ self._llms = []
38
+ if GROQ_API_KEYS:
39
+ try:
40
+ from langchain_groq import ChatGroq
41
+ self._llms = [
42
+ ChatGroq(
43
+ groq_api_key=key,
44
+ model_name=GROQ_BRAIN_MODEL,
45
+ temperature=0.0,
46
+ max_tokens=20,
47
+ request_timeout=10,
48
+ )
49
+ for key in GROQ_API_KEYS
50
+ ]
51
+ logger.info("[BRAIN] Groq brain initialized (model: %s) with %d key(s)", GROQ_BRAIN_MODEL, len(self._llms))
52
+ except Exception as e:
53
+ logger.warning("[BRAIN] Failed to create Groq brain: %s", e)
54
+ if not self._llms:
55
+ logger.warning("[BRAIN] No API keys. Classification will default to realtime.")
56
+
57
+ def classify(
58
+ self,
59
+ user_message: str,
60
+ chat_history: Optional[List[Tuple[str, str]]] = None,
61
+ key_index: int = 0,
62
+ ) -> Tuple[QueryType, str, int]:
63
+ if not self._llms:
64
+ return ("realtime", REASONING_DEFAULT, 0)
65
+
66
+ context_lines = []
67
+ if chat_history:
68
+ for u, a in chat_history[-MAX_CONTEXT_TURNS:]:
69
+ u_preview = (u or "")[:MAX_MESSAGE_PREVIEW] + ("..." if len(u or "") > MAX_MESSAGE_PREVIEW else "")
70
+ a_preview = (a or "")[:MAX_MESSAGE_PREVIEW] + ("..." if len(a or "") > MAX_MESSAGE_PREVIEW else "")
71
+ context_lines.append(f"User: {u_preview}")
72
+ context_lines.append(f"Assistant: {a_preview}")
73
+ context_block = "\n".join(context_lines) if context_lines else "(No prior conversation)"
74
+ msg_preview = (user_message or "")[:MAX_MESSAGE_PREVIEW]
75
+ user_content = f"""Conversation so far:
76
+ {context_block}
77
+
78
+ Current user message: {msg_preview}
79
+
80
+ Classify the current message. Output ONLY: general or realtime"""
81
+
82
+ t0 = time.perf_counter()
83
+ try:
84
+ from langchain_core.messages import SystemMessage, HumanMessage
85
+ idx = key_index % len(self._llms)
86
+ llm = self._llms[idx]
87
+ response = llm.invoke([
88
+ SystemMessage(content=_BRAIN_SYSTEM_PROMPT),
89
+ HumanMessage(content=user_content),
90
+ ])
91
+ text = (response.content or "").strip().lower()
92
+ except Exception as e:
93
+ elapsed_ms = int((time.perf_counter() - t0) * 1000)
94
+ logger.warning("[BRAIN] Groq error after %d ms: %s. Defaulting to realtime.", elapsed_ms, e)
95
+ return ("realtime", f"API error: {str(e)[:60]}", elapsed_ms)
96
+
97
+ elapsed_ms = int((time.perf_counter() - t0) * 1000)
98
+ if re.search(r"\brealtime\b", text):
99
+ logger.info("[BRAIN] Groq (key #%d) returned realtime in %d ms", key_index + 1, elapsed_ms)
100
+ return ("realtime", REASONING_REALTIME, elapsed_ms)
101
+ if re.search(r"\bgeneral\b", text):
102
+ logger.info("[BRAIN] Groq (key #%d) returned general in %d ms", key_index + 1, elapsed_ms)
103
+ return ("general", REASONING_GENERAL, elapsed_ms)
104
+ logger.warning("[BRAIN] Unexpected output: %r in %d ms. Defaulting to realtime.", text[:100], elapsed_ms)
105
+ return ("realtime", REASONING_UNCLEAR, elapsed_ms)
app/services/chat_service.py ADDED
@@ -0,0 +1,346 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import logging
3
+ import time
4
+ from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
5
+ from pathlib import Path
6
+ from typing import List, Optional, Dict, Iterator, Any, Union
7
+ import uuid
8
+ import threading
9
+
10
+ from config import CHATS_DATA_DIR, MAX_CHAT_HISTORY_TURNS, GROQ_API_KEYS
11
+ from app.models import ChatMessage, ChatHistory
12
+ from app.services.groq_service import GroqService
13
+ from app.services.realtime_service import RealtimeGroqService
14
+ from app.services.brain_service import BrainService
15
+ from app.utils.key_rotation import get_next_key_pair
16
+
17
+ logger = logging.getLogger("J.A.R.V.I.S")
18
+
19
+ JARVIS_BRAIN_SEARCH_TIMEOUT = 15
20
+ SAVE_EVERY_N_CHUNKS = 5
21
+
22
+ class ChatService:
23
+ def __init__(
24
+ self,
25
+ groq_service: GroqService,
26
+ realtime_service: RealtimeGroqService = None,
27
+ brain_service: BrainService = None,
28
+ ):
29
+ self.groq_service = groq_service
30
+ self.realtime_service = realtime_service
31
+ self.brain_service = brain_service
32
+ self.sessions: Dict[str, List[ChatMessage]] = {}
33
+ self._save_lock = threading.Lock()
34
+
35
+ def load_session_from_disk(self, session_id: str) -> bool:
36
+ safe_session_id = session_id.replace("-", "").replace(" ", "_")
37
+ filename = f"chat_{safe_session_id}.json"
38
+ filepath = CHATS_DATA_DIR / filename
39
+
40
+ if not filepath.exists():
41
+ return False
42
+ try:
43
+ with open(filepath, "r", encoding="utf-8") as f:
44
+ chat_dict = json.load(f)
45
+ messages = []
46
+ for msg in chat_dict.get("messages", []):
47
+ if not isinstance(msg, dict):
48
+ continue
49
+ role = msg.get("role")
50
+ role = role if role in ("user", "assistant") else "user"
51
+ content = msg.get("content")
52
+ content = content if isinstance(content, str) else str(content or "")
53
+ messages.append(ChatMessage(role=role, content=content))
54
+ self.sessions[session_id] = messages
55
+ return True
56
+ except Exception as e:
57
+ logger.warning("Failed to load session %s from disk: %s", session_id, e)
58
+ return False
59
+
60
+ def validate_session_id(self, session_id: str) -> bool:
61
+ if not session_id or not session_id.strip():
62
+ return False
63
+ if "\0" in session_id:
64
+ return False
65
+ if ".." in session_id or "/" in session_id or "\\" in session_id:
66
+ return False
67
+ if len(session_id) > 255:
68
+ return False
69
+ return True
70
+
71
+ def get_or_create_session(self, session_id: Optional[str] = None) -> str:
72
+ t0 = time.perf_counter()
73
+ if not session_id:
74
+ new_session_id = str(uuid.uuid4())
75
+ self.sessions[new_session_id] = []
76
+ logger.info("[TIMING] session_get_or_create: %.3fs (new)", time.perf_counter() - t0)
77
+ return new_session_id
78
+ if not self.validate_session_id(session_id):
79
+ raise ValueError(
80
+ f"Invalid session_id format: {session_id}. Session ID must be non-empty, "
81
+ "not contain path traversal characters, and be under 255 characters."
82
+ )
83
+
84
+ if session_id in self.sessions:
85
+ logger.info("[TIMING] session_get_or_create: %.3fs (memory)", time.perf_counter() - t0)
86
+ return session_id
87
+
88
+ if self.load_session_from_disk(session_id):
89
+ logger.info("[TIMING] session_get_or_create: %.3fs (disk)", time.perf_counter() - t0)
90
+ return session_id
91
+
92
+ self.sessions[session_id] = []
93
+ logger.info("[TIMING] session_get_or_create: %.3fs (new_id)", time.perf_counter() - t0)
94
+ return session_id
95
+
96
+ def add_message(self, session_id: str, role: str, content: str):
97
+ if session_id not in self.sessions:
98
+ self.sessions[session_id] = []
99
+ self.sessions[session_id].append(ChatMessage(role=role, content=content))
100
+
101
+ def get_chat_history(self, session_id: str) -> List[ChatMessage]:
102
+ return self.sessions.get(session_id, [])
103
+
104
+ def format_history_for_llm(self, session_id: str, exclude_last: bool = False) -> List[tuple]:
105
+ messages = self.get_chat_history(session_id)
106
+ history = []
107
+ messages_to_process = messages[:-1] if exclude_last and messages else messages
108
+
109
+ i = 0
110
+ while i < len(messages_to_process) - 1:
111
+ user_msg = messages_to_process[i]
112
+ ai_msg = messages_to_process[i + 1]
113
+ if user_msg.role == "user" and ai_msg.role == "assistant":
114
+ u_content = user_msg.content if isinstance(user_msg.content, str) else str(user_msg.content or "")
115
+ a_content = ai_msg.content if isinstance(ai_msg.content, str) else str(ai_msg.content or "")
116
+ history.append((u_content, a_content))
117
+ i += 2
118
+ else:
119
+ i += 1
120
+
121
+ if len(history) > MAX_CHAT_HISTORY_TURNS:
122
+ history = history[-MAX_CHAT_HISTORY_TURNS:]
123
+ return history
124
+
125
+ def process_message(self, session_id: str, user_message: str) -> str:
126
+ logger.info("[GENERAL] Session: %s | User: %.200s", session_id[:12], user_message)
127
+ self.add_message(session_id, "user", user_message)
128
+ chat_history = self.format_history_for_llm(session_id, exclude_last=True)
129
+ logger.info("[GENERAL] History pairs sent to LLM: %d", len(chat_history))
130
+ _, chat_idx = get_next_key_pair(len(GROQ_API_KEYS), need_brain=False)
131
+ response = self.groq_service.get_response(question=user_message, chat_history=chat_history, key_start_index=chat_idx)
132
+ self.add_message(session_id, "assistant", response)
133
+ logger.info("[GENERAL] Response length: %d chars | Preview: %.120s", len(response), response)
134
+ return response
135
+
136
+ def process_realtime_message(self, session_id: str, user_message: str) -> str:
137
+ if not self.realtime_service:
138
+ raise ValueError("Realtime service is not initialized. Cannot process realtime queries.")
139
+ logger.info("[REALTIME] Session: %s | User: %.200s", session_id[:12], user_message)
140
+ self.add_message(session_id, "user", user_message)
141
+ chat_history = self.format_history_for_llm(session_id, exclude_last=True)
142
+ logger.info("[REALTIME] History pairs sent to LLM: %d", len(chat_history))
143
+ _, chat_idx = get_next_key_pair(len(GROQ_API_KEYS), need_brain=False)
144
+ response = self.realtime_service.get_response(question=user_message, chat_history=chat_history, key_start_index=chat_idx)
145
+ self.add_message(session_id, "assistant", response)
146
+ logger.info("[REALTIME] Response length: %d chars | Preview: %.120s", len(response), response)
147
+ return response
148
+
149
+ def process_message_stream(
150
+ self, session_id: str, user_message: str
151
+ ) -> Iterator[Union[str, Dict[str, Any]]]:
152
+ logger.info("[GENERAL-STREAM] Session: %s | User: %.200s", session_id[:12], user_message)
153
+ self.add_message(session_id, "user", user_message)
154
+ self.add_message(session_id, "assistant", "")
155
+ chat_history = self.format_history_for_llm(session_id, exclude_last=True)
156
+ logger.info("[GENERAL-STREAM] History pairs sent to LLM: %d", len(chat_history))
157
+
158
+ yield {"_activity": {"event": "query_detected", "message": user_message}}
159
+ yield {"_activity": {"event": "routing", "route": "general"}}
160
+ yield {"_activity": {"event": "streaming_started", "route": "general"}}
161
+
162
+ _, chat_idx = get_next_key_pair(len(GROQ_API_KEYS), need_brain=False)
163
+ chunk_count = 0
164
+ t0 = time.perf_counter()
165
+ try:
166
+ for chunk in self.groq_service.stream_response(
167
+ question=user_message, chat_history=chat_history, key_start_index=chat_idx
168
+ ):
169
+ if isinstance(chunk, dict):
170
+ yield chunk
171
+ continue
172
+ if chunk_count == 0:
173
+ elapsed_ms = int((time.perf_counter() - t0) * 1000)
174
+ yield {"_activity": {"event": "first_chunk", "route": "general", "elapsed_ms": elapsed_ms}}
175
+ self.sessions[session_id][-1].content += chunk
176
+ chunk_count += 1
177
+ if chunk_count % SAVE_EVERY_N_CHUNKS == 0:
178
+ self.save_chat_session(session_id, log_timing=False)
179
+ yield chunk
180
+ finally:
181
+ final_response = self.sessions[session_id][-1].content
182
+ logger.info("[GENERAL-STREAM] Completed | Chunks: %d | Response length: %d chars", chunk_count, len(final_response))
183
+ self.save_chat_session(session_id)
184
+
185
+ def process_realtime_message_stream(
186
+ self, session_id: str, user_message: str
187
+ ) -> Iterator[Union[str, Dict[str, Any]]]:
188
+ if not self.realtime_service:
189
+ raise ValueError("Realtime service is not initialized.")
190
+ logger.info("[REALTIME-STREAM] Session: %s | User: %.200s", session_id[:12], user_message)
191
+ self.add_message(session_id, "user", user_message)
192
+ self.add_message(session_id, "assistant", "")
193
+ chat_history = self.format_history_for_llm(session_id, exclude_last=True)
194
+ logger.info("[REALTIME-STREAM] History pairs sent to LLM: %d", len(chat_history))
195
+
196
+ yield {"_activity": {"event": "query_detected", "message": user_message}}
197
+ yield {"_activity": {"event": "routing", "route": "realtime"}}
198
+ yield {"_activity": {"event": "streaming_started", "route": "realtime"}}
199
+
200
+ _, chat_idx = get_next_key_pair(len(GROQ_API_KEYS), need_brain=False)
201
+ chunk_count = 0
202
+ t0 = time.perf_counter()
203
+ try:
204
+ for chunk in self.realtime_service.stream_response(
205
+ question=user_message, chat_history=chat_history, key_start_index=chat_idx
206
+ ):
207
+ if isinstance(chunk, dict):
208
+ yield chunk
209
+ continue
210
+ if chunk_count == 0:
211
+ elapsed_ms = int((time.perf_counter() - t0) * 1000)
212
+ yield {"_activity": {"event": "first_chunk", "route": "realtime", "elapsed_ms": elapsed_ms}}
213
+ self.sessions[session_id][-1].content += chunk
214
+ chunk_count += 1
215
+ if chunk_count % SAVE_EVERY_N_CHUNKS == 0:
216
+ self.save_chat_session(session_id, log_timing=False)
217
+ yield chunk
218
+ finally:
219
+ final_response = self.sessions[session_id][-1].content
220
+ logger.info("[REALTIME-STREAM] Completed | Chunks: %d | Response length: %d chars", chunk_count, len(final_response))
221
+ self.save_chat_session(session_id)
222
+
223
+ def process_jarvis_message_stream(
224
+ self, session_id: str, user_message: str
225
+ ) -> Iterator[Union[str, Dict[str, Any]]]:
226
+ logger.info("[JARVIS-STREAM] Session: %s | User: %.200s", session_id[:12], user_message)
227
+ self.add_message(session_id, "user", user_message)
228
+ self.add_message(session_id, "assistant", "")
229
+ chat_history = self.format_history_for_llm(session_id, exclude_last=True)
230
+
231
+ yield {"_activity": {"event": "query_detected", "message": user_message}}
232
+
233
+ brain_idx, chat_idx = get_next_key_pair(len(GROQ_API_KEYS), need_brain=True)
234
+
235
+ query_type = "realtime"
236
+ reasoning = "Defaulting to realtime"
237
+ brain_elapsed_ms = 0
238
+ formatted_results = ""
239
+ search_payload = None
240
+
241
+ def _run_brain():
242
+ if self.brain_service and brain_idx is not None:
243
+ qt, r, ms = self.brain_service.classify(user_message, chat_history, key_index=brain_idx)
244
+ return (qt, r, ms)
245
+ return ("realtime", "No brain service", 0)
246
+
247
+ def _run_search():
248
+ if self.realtime_service:
249
+ return self.realtime_service.prefetch_web_search(user_message, chat_history)
250
+ return ("", None)
251
+
252
+ with ThreadPoolExecutor(max_workers=2) as executor:
253
+ future_brain = executor.submit(_run_brain)
254
+ future_search = executor.submit(_run_search)
255
+ try:
256
+ query_type, reasoning, brain_elapsed_ms = future_brain.result(timeout=JARVIS_BRAIN_SEARCH_TIMEOUT)
257
+ except FuturesTimeoutError:
258
+ logger.warning("[JARVIS] Brain classification timed out after %ds, defaulting to realtime", JARVIS_BRAIN_SEARCH_TIMEOUT)
259
+ query_type, reasoning, brain_elapsed_ms = "realtime", "Brain timeout, defaulting to realtime", 0
260
+
261
+ if query_type == "general":
262
+ formatted_results, search_payload = "", None
263
+ else:
264
+ try:
265
+ formatted_results, search_payload = future_search.result(timeout=JARVIS_BRAIN_SEARCH_TIMEOUT)
266
+ except FuturesTimeoutError:
267
+ logger.warning("[JARVIS] Web search prefetch timed out after %ds", JARVIS_BRAIN_SEARCH_TIMEOUT)
268
+ formatted_results, search_payload = "", None
269
+
270
+ logger.info("[JARVIS] Brain: %s in %d ms - %s", query_type, brain_elapsed_ms, reasoning)
271
+
272
+ yield {"_activity": {"event": "decision", "query_type": query_type, "reasoning": reasoning, "elapsed_ms": brain_elapsed_ms}}
273
+ yield {"_activity": {"event": "routing", "route": query_type}}
274
+ if query_type == "realtime" and search_payload:
275
+ yield {"_search_results": search_payload}
276
+ yield {"_activity": {"event": "streaming_started", "route": query_type}}
277
+
278
+ chunk_count = 0
279
+ t0 = time.perf_counter()
280
+ try:
281
+ if query_type == "general":
282
+ stream = self.groq_service.stream_response(
283
+ question=user_message, chat_history=chat_history, key_start_index=chat_idx
284
+ )
285
+ else:
286
+ if not self.realtime_service:
287
+ raise ValueError("Realtime service not initialized.")
288
+ stream = self.realtime_service.stream_response_with_prefetched(
289
+ question=user_message,
290
+ chat_history=chat_history,
291
+ formatted_results=formatted_results,
292
+ payload=search_payload,
293
+ key_start_index=chat_idx,
294
+ )
295
+
296
+ for chunk in stream:
297
+ if isinstance(chunk, dict):
298
+ yield chunk
299
+ continue
300
+ if chunk_count == 0:
301
+ elapsed_ms = int((time.perf_counter() - t0) * 1000)
302
+ yield {"_activity": {"event": "first_chunk", "route": query_type, "elapsed_ms": elapsed_ms}}
303
+ self.sessions[session_id][-1].content += chunk
304
+ chunk_count += 1
305
+ if chunk_count % SAVE_EVERY_N_CHUNKS == 0:
306
+ self.save_chat_session(session_id, log_timing=False)
307
+ yield chunk
308
+ finally:
309
+ final_response = self.sessions[session_id][-1].content
310
+ logger.info("[JARVIS-STREAM] Completed | Route: %s | Chunks: %d | Response length: %d chars",
311
+ query_type, chunk_count, len(final_response))
312
+ self.save_chat_session(session_id)
313
+
314
+ def save_chat_session(self, session_id: str, log_timing: bool = True):
315
+ if session_id not in self.sessions or not self.sessions[session_id]:
316
+ return
317
+
318
+ messages = self.sessions[session_id]
319
+ safe_session_id = session_id.replace("-", "").replace(" ", "_")
320
+ filename = f"chat_{safe_session_id}.json"
321
+ filepath = CHATS_DATA_DIR / filename
322
+
323
+ chat_dict = {
324
+ "session_id": session_id,
325
+ "messages": [{"role": msg.role, "content": msg.content} for msg in messages]
326
+ }
327
+
328
+ max_retries = 3
329
+ last_exc = None
330
+ for attempt in range(max_retries):
331
+ try:
332
+ with self._save_lock:
333
+ t0 = time.perf_counter() if log_timing else 0
334
+ with open(filepath, "w", encoding="utf-8") as f:
335
+ json.dump(chat_dict, f, indent=2, ensure_ascii=False)
336
+ if log_timing:
337
+ logger.info("[TIMING] save_session_json: %.3fs", time.perf_counter() - t0)
338
+ return
339
+ except OSError as e:
340
+ last_exc = e
341
+ if attempt < max_retries - 1:
342
+ time.sleep(0.1 * (attempt + 1))
343
+ except Exception as e:
344
+ logger.error("Failed to save chat session %s to disk: %s", session_id, e)
345
+ return
346
+ logger.error("Failed to save chat session %s after %d retries: %s", session_id, max_retries, last_exc)
app/services/groq_service.py ADDED
@@ -0,0 +1,257 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import List, Optional, Iterator
2
+ from langchain_groq import ChatGroq
3
+ from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
4
+ from langchain_core.messages import HumanMessage, AIMessage
5
+ import logging
6
+ import time
7
+
8
+ from config import (
9
+ GROQ_API_KEYS,
10
+ GROQ_MODEL,
11
+ JARVIS_SYSTEM_PROMPT,
12
+ GENERAL_CHAT_ADDENDUM,
13
+ )
14
+ from app.services.vector_store import VectorStoreService
15
+ from app.utils.time_info import get_time_information
16
+ from app.utils.retry import with_retry
17
+
18
+ logger = logging.getLogger("J.A.R.V.I.S")
19
+
20
+ GROQ_REQUEST_TIMEOUT = 60
21
+ ALL_APIS_FAILED_MESSAGE = (
22
+ "I'm unable to process your request at the moment. All API services are "
23
+ "temporarily unavailable. Please try again in a few minutes."
24
+ )
25
+
26
+ class AllGroqApisFailedError(Exception):
27
+ pass
28
+
29
+ def escape_curly_braces(text: str) -> str:
30
+ if not text:
31
+ return text
32
+ return text.replace("{", "{{").replace("}", "}}")
33
+
34
+ def _is_rate_limit_error(exc: BaseException) -> bool:
35
+ msg = str(exc).lower()
36
+ return "429" in str(exc) or "rate limit" in msg or "tokens per day" in msg
37
+
38
+ def _log_timing(label: str, elapsed: float, extra: str = ""):
39
+ msg = f"[TIMING] {label}: {elapsed:.3f}s"
40
+ if extra:
41
+ msg += f" ({extra})"
42
+ logger.info(msg)
43
+
44
+ def _mask_api_key(key: str) -> str:
45
+ if not key or len(key) <= 12:
46
+ return "***masked***"
47
+ return f"{key[:8]}...{key[-4:]}"
48
+
49
+ class GroqService:
50
+ def __init__(self, vector_store_service: VectorStoreService):
51
+ if not GROQ_API_KEYS:
52
+ raise ValueError(
53
+ "No Groq API keys configured. Set GROQ_API_KEY (and optionally GROQ_API_KEY_2, GROQ_API_KEY_3, ...) in .env"
54
+ )
55
+ self.llms = [
56
+ ChatGroq(
57
+ groq_api_key=key,
58
+ model_name=GROQ_MODEL,
59
+ temperature=0.5,
60
+ request_timeout=GROQ_REQUEST_TIMEOUT,
61
+ )
62
+ for key in GROQ_API_KEYS
63
+ ]
64
+ self.vector_store_service = vector_store_service
65
+ logger.info(f"Initialized GroqService with {len(GROQ_API_KEYS)} API key(s) (primary-first fallback)")
66
+
67
+ def _invoke_llm(
68
+ self,
69
+ prompt: ChatPromptTemplate,
70
+ messages: list,
71
+ question: str,
72
+ key_start_index: int = 0,
73
+ ) -> str:
74
+ n = len(self.llms)
75
+ last_exc = None
76
+ keys_tried = []
77
+
78
+ for j in range(n):
79
+ i = (key_start_index + j) % n
80
+ keys_tried.append(i)
81
+ masked_key = _mask_api_key(GROQ_API_KEYS[i])
82
+ logger.info(f"Trying API key #{i + 1}/{n}: {masked_key}")
83
+
84
+ def _invoke_with_key():
85
+ chain = prompt | self.llms[i]
86
+ return chain.invoke({"history": messages, "question": question})
87
+
88
+ try:
89
+ response = with_retry(
90
+ _invoke_with_key,
91
+ max_retries=2,
92
+ initial_delay=0.5,
93
+ )
94
+ if i > 0:
95
+ logger.info(f"Fallback successful: API key #{i + 1}/{n} succeeded: {masked_key}")
96
+ return response.content
97
+ except Exception as e:
98
+ last_exc = e
99
+ if _is_rate_limit_error(e):
100
+ logger.warning(f"API key #{i + 1}/{n} rate limited: {masked_key}")
101
+ else:
102
+ logger.warning(f"API key #{i + 1}/{n} failed: {masked_key} - {str(e)[:100]}")
103
+ if i < n - 1:
104
+ logger.info("Falling back to next API key...")
105
+ continue
106
+ break
107
+
108
+ masked_all = ", ".join([_mask_api_key(GROQ_API_KEYS[j]) for j in keys_tried])
109
+ logger.error(f"All {n} API key(s) failed. Tried: {masked_all}")
110
+ raise AllGroqApisFailedError(ALL_APIS_FAILED_MESSAGE) from last_exc
111
+
112
+ def _stream_llm(
113
+ self,
114
+ prompt: ChatPromptTemplate,
115
+ messages: list,
116
+ question: str,
117
+ key_start_index: int = 0,
118
+ ) -> Iterator[str]:
119
+ n = len(self.llms)
120
+ last_exc = None
121
+
122
+ for j in range(n):
123
+ i = (key_start_index + j) % n
124
+ masked_key = _mask_api_key(GROQ_API_KEYS[i])
125
+ logger.info(f"Streaming with API key #{i + 1}/{n}: {masked_key}")
126
+
127
+ try:
128
+ chain = prompt | self.llms[i]
129
+ chunk_count = 0
130
+ first_chunk_time = None
131
+ stream_start = time.perf_counter()
132
+
133
+ for chunk in chain.stream({"history": messages, "question": question}):
134
+ content = ""
135
+ if hasattr(chunk, "content"):
136
+ content = chunk.content or ""
137
+ elif isinstance(chunk, dict) and "content" in chunk:
138
+ content = chunk.get("content", "") or ""
139
+
140
+ if isinstance(content, str) and content:
141
+ if first_chunk_time is None:
142
+ first_chunk_time = time.perf_counter() - stream_start
143
+ _log_timing("first_chunk", first_chunk_time)
144
+ chunk_count += 1
145
+ yield content
146
+
147
+ total_stream = time.perf_counter() - stream_start
148
+ _log_timing("groq_stream_total", total_stream, f"chunks: {chunk_count}")
149
+
150
+ if i > 0 and chunk_count > 0:
151
+ logger.info(f"Fallback successful: API key #{i + 1}/{n} streamed: {masked_key}")
152
+ return
153
+
154
+ except Exception as e:
155
+ last_exc = e
156
+ if _is_rate_limit_error(e):
157
+ logger.warning(f"API key #{i + 1}/{n} rate limited: {masked_key}")
158
+ else:
159
+ logger.warning(f"API key #{i + 1}/{n} failed: {masked_key} - {str(e)[:100]}")
160
+ if i < n - 1:
161
+ logger.info("Falling back to next API key for stream...")
162
+ continue
163
+ break
164
+
165
+ logger.error(f"All {n} API key(s) failed during stream.")
166
+ raise AllGroqApisFailedError(ALL_APIS_FAILED_MESSAGE) from last_exc
167
+
168
+ def _build_prompt_and_messages(
169
+ self,
170
+ question: str,
171
+ chat_history: Optional[List[tuple]] = None,
172
+ extra_system_parts: Optional[List[str]] = None,
173
+ mode_addendum: str = "",
174
+ ) -> tuple:
175
+ context = ""
176
+ context_sources = []
177
+ t0 = time.perf_counter()
178
+ try:
179
+ retriever = self.vector_store_service.get_retriever(k=10)
180
+ context_docs = retriever.invoke(question)
181
+ if context_docs:
182
+ context = "\n".join([doc.page_content for doc in context_docs])
183
+ context_sources = [doc.metadata.get("source", "unknown") for doc in context_docs]
184
+ logger.info("[CONTEXT] Retrieved %d chunks from sources: %s", len(context_docs), context_sources)
185
+ else:
186
+ logger.info("[CONTEXT] No relevant chunks found for query")
187
+ except Exception as retrieval_err:
188
+ logger.warning("Vector store retrieval failed, using empty context: %s", retrieval_err)
189
+ finally:
190
+ _log_timing("vector_db", time.perf_counter() - t0)
191
+
192
+ time_info = get_time_information()
193
+ system_message = JARVIS_SYSTEM_PROMPT
194
+ system_message += f"\n\nCurrent time and date: {time_info}" # Layer 2: time awareness
195
+
196
+ if context:
197
+ system_message += f"\n\nRelevant context from your learning data and past conversations:\n{escape_curly_braces(context)}"
198
+
199
+ if extra_system_parts:
200
+ system_message += "\n\n" + "\n\n".join(extra_system_parts)
201
+
202
+ if mode_addendum:
203
+ system_message += f"\n\n{mode_addendum}"
204
+
205
+ prompt = ChatPromptTemplate.from_messages([
206
+ ("system", system_message),
207
+ MessagesPlaceholder(variable_name="history"),
208
+ ("human", "{question}"),
209
+ ])
210
+
211
+ messages = []
212
+ if chat_history:
213
+ for human_msg, ai_msg in chat_history:
214
+ messages.append(HumanMessage(content=human_msg))
215
+ messages.append(AIMessage(content=ai_msg))
216
+
217
+ logger.info("[PROMPT] System message length: %d chars | History pairs: %d | Question: %.100s",
218
+ len(system_message), len(chat_history) if chat_history else 0, question)
219
+
220
+ return prompt, messages
221
+
222
+ def get_response(
223
+ self,
224
+ question: str,
225
+ chat_history: Optional[List[tuple]] = None,
226
+ key_start_index: int = 0,
227
+ ) -> str:
228
+ try:
229
+ prompt, messages = self._build_prompt_and_messages(
230
+ question, chat_history, mode_addendum=GENERAL_CHAT_ADDENDUM,
231
+ )
232
+ t0 = time.perf_counter()
233
+ result = self._invoke_llm(prompt, messages, question, key_start_index=key_start_index)
234
+ _log_timing("groq_api", time.perf_counter() - t0)
235
+ logger.info("[RESPONSE] General chat | Length: %d chars | Preview: %.120s", len(result), result)
236
+ return result
237
+ except AllGroqApisFailedError:
238
+ raise
239
+ except Exception as e:
240
+ raise Exception(f"Error getting response from Groq: {str(e)}") from e
241
+
242
+ def stream_response(
243
+ self,
244
+ question: str,
245
+ chat_history: Optional[List[tuple]] = None,
246
+ key_start_index: int = 0,
247
+ ) -> Iterator[str]:
248
+ try:
249
+ prompt, messages = self._build_prompt_and_messages(
250
+ question, chat_history, mode_addendum=GENERAL_CHAT_ADDENDUM,
251
+ )
252
+ yield {"_activity": {"event": "context_retrieved", "message": "Retrieved relevant context from knowledge base"}}
253
+ yield from self._stream_llm(prompt, messages, question, key_start_index=key_start_index)
254
+ except AllGroqApisFailedError:
255
+ raise
256
+ except Exception as e:
257
+ raise Exception(f"Error streaming response from Groq: {str(e)}") from e
app/services/realtime_service.py ADDED
@@ -0,0 +1,277 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import List, Optional, Iterator, Tuple, Any
2
+ from tavily import TavilyClient
3
+ import logging
4
+ import os
5
+ import time
6
+
7
+ from app.services.groq_service import GroqService, escape_curly_braces, AllGroqApisFailedError
8
+ from app.services.vector_store import VectorStoreService
9
+ from app.utils.retry import with_retry
10
+ from config import REALTIME_CHAT_ADDENDUM, GROQ_API_KEYS, GROQ_MODEL
11
+
12
+ logger = logging.getLogger("J.A.R.V.I.S")
13
+
14
+ GROQ_REQUEST_TIMEOUT_FAST = 15
15
+
16
+ _QUERY_EXTRACTION_PROMPT = (
17
+ "You are a search query optimizer. Given the user's message and recent conversation, "
18
+ "produce a single short, focused web search query (max 12 words) that will find the "
19
+ "information the user needs. Resolve any references (like 'that website', 'him', 'it') "
20
+ "using the conversation history. Output ONLY the search query, nothing else."
21
+ )
22
+
23
+ class RealtimeGroqService(GroqService):
24
+ def __init__(self, vector_store_service: VectorStoreService):
25
+ super().__init__(vector_store_service)
26
+
27
+ tavily_api_key = os.getenv("TAVILY_API_KEY", "")
28
+ if tavily_api_key:
29
+ self.tavily_client = TavilyClient(api_key=tavily_api_key)
30
+ logger.info("Tavily search client initialized successfully")
31
+ else:
32
+ self.tavily_client = None
33
+ logger.warning("TAVILY_API_KEY not set. Realtime search will be unavailable.")
34
+
35
+ if GROQ_API_KEYS:
36
+ from langchain_groq import ChatGroq
37
+ self._fast_llm = ChatGroq(
38
+ groq_api_key=GROQ_API_KEYS[0],
39
+ model_name=GROQ_MODEL,
40
+ temperature=0.0,
41
+ request_timeout=GROQ_REQUEST_TIMEOUT_FAST,
42
+ max_tokens=50,
43
+ )
44
+ else:
45
+ self._fast_llm = None
46
+
47
+ def _extract_search_query(
48
+ self, question: str, chat_history: Optional[List[tuple]] = None
49
+ ) -> str:
50
+ if not self._fast_llm:
51
+ return question
52
+
53
+ q = question.strip()
54
+ if len(q) <= 60 and not any(p in q.lower() for p in (" it ", " that ", " him ", " her ", " them ")):
55
+ return q
56
+
57
+ try:
58
+ t0 = time.perf_counter()
59
+ history_context = ""
60
+ if chat_history:
61
+ recent = chat_history[-3:]
62
+ parts = []
63
+ for h, a in recent:
64
+ parts.append(f"User: {h[:200]}")
65
+ parts.append(f"Assistant: {a[:200]}")
66
+ history_context = "\n".join(parts)
67
+
68
+ if history_context:
69
+ full_prompt = (
70
+ f"{_QUERY_EXTRACTION_PROMPT}\n\n"
71
+ f"Recent conversation:\n{history_context}\n\n"
72
+ f"User's latest message: {question}\n\n"
73
+ f"Search query:"
74
+ )
75
+ else:
76
+ full_prompt = (
77
+ f"{_QUERY_EXTRACTION_PROMPT}\n\n"
78
+ f"User's message: {question}\n\n"
79
+ f"Search query:"
80
+ )
81
+
82
+ response = self._fast_llm.invoke(full_prompt)
83
+ extracted = response.content.strip().strip("'").strip('"')
84
+
85
+ if extracted and 3 <= len(extracted) <= 200:
86
+ logger.info(
87
+ "[REALTIME] Query extraction: '%s' -> '%s' (%.3fs)",
88
+ question[:80], extracted[:80], time.perf_counter() - t0,
89
+ )
90
+ return extracted
91
+
92
+ logger.warning("[REALTIME] Query extraction returned unusable result, using raw question")
93
+ return question
94
+
95
+ except Exception as e:
96
+ logger.warning("[REALTIME] Query extraction failed (%s), using raw question", e)
97
+ return question
98
+
99
+ def search_tavily(self, query: str, num_results: int = 7) -> Tuple[str, Optional[dict]]:
100
+ if not self.tavily_client:
101
+ logger.warning("Tavily client not initialized. TAVILY_API_KEY not set.")
102
+ return ("", None)
103
+ if not query or not str(query).strip():
104
+ return ("", None)
105
+
106
+ try:
107
+ t0 = time.perf_counter()
108
+
109
+ response = with_retry(
110
+ lambda: self.tavily_client.search(
111
+ query=query,
112
+ search_depth="fast",
113
+ max_results=num_results,
114
+ include_answer=True,
115
+ include_raw_content=False,
116
+ ),
117
+ max_retries=3,
118
+ initial_delay=1.0,
119
+ )
120
+
121
+ results = response.get("results", [])
122
+ ai_answer = response.get("answer", "")
123
+
124
+ if not results and not ai_answer:
125
+ logger.warning("No Tavily search results for query: %s", query)
126
+ return ("", None)
127
+
128
+ payload: Optional[dict] = {
129
+ "query": query,
130
+ "answer": ai_answer,
131
+ "results": [
132
+ {
133
+ "title": r.get("title", "No title"),
134
+ "content": (r.get("content") or "")[:500],
135
+ "url": r.get("url", ""),
136
+ "score": round(float(r.get("score", 0)), 2),
137
+ }
138
+ for r in results[:num_results]
139
+ ],
140
+ }
141
+
142
+ parts = [f"--- WEB SEARCH RESULTS FOR: {query} ---\n"]
143
+ if ai_answer:
144
+ parts.append(f"AI-SYNTHESIZED ANSWER (use this as your primary source):\n{ai_answer}\n")
145
+ if results:
146
+ parts.append("INDIVIDUAL SOURCES:")
147
+ for i, result in enumerate(results[:num_results], 1):
148
+ title = result.get("title", "No title")
149
+ content = result.get("content", "")
150
+ url = result.get("url", "")
151
+ score = result.get("score", 0)
152
+ parts.append(f"\n[Source {i}] (relevance: {score:.2f})")
153
+ parts.append(f"Title: {title}")
154
+ if content:
155
+ parts.append(f"Content: {content}")
156
+ if url:
157
+ parts.append(f"URL: {url}")
158
+
159
+ parts.append("\n=== END SEARCH RESULTS ===")
160
+ formatted = "\n".join(parts)
161
+
162
+ logger.info(
163
+ "[TAVILY] %d results, AI answer: %s, formatted: %d chars (%.3fs)",
164
+ len(results), "yes" if ai_answer else "no",
165
+ len(formatted), time.perf_counter() - t0,
166
+ )
167
+ return (formatted, payload)
168
+
169
+ except Exception as e:
170
+ logger.error("Error performing Tavily search: %s", e)
171
+ return ("", None)
172
+
173
+ def get_response(self, question: str, chat_history: Optional[List[tuple]] = None, key_start_index: int = 0) -> str:
174
+ try:
175
+ search_query = self._extract_search_query(question, chat_history)
176
+ logger.info("[REALTIME] Searching Tavily for: %s", search_query)
177
+
178
+ formatted_results, _ = self.search_tavily(search_query, num_results=7)
179
+ if formatted_results:
180
+ logger.info("[REALTIME] Tavily returned results (length: %d chars)", len(formatted_results))
181
+ else:
182
+ logger.warning("[REALTIME] Tavily returned no results for: %s", search_query)
183
+
184
+ extra_parts = [escape_curly_braces(formatted_results)] if formatted_results else None
185
+ prompt, messages = self._build_prompt_and_messages(
186
+ question, chat_history,
187
+ extra_system_parts=extra_parts,
188
+ mode_addendum=REALTIME_CHAT_ADDENDUM,
189
+ )
190
+
191
+ t0 = time.perf_counter()
192
+ response_content = self._invoke_llm(prompt, messages, question, key_start_index=key_start_index)
193
+ logger.info("[TIMING] groq_api: %.3fs", time.perf_counter() - t0)
194
+ logger.info(
195
+ "[RESPONSE] Realtime chat | Length: %d chars | Preview: %.120s",
196
+ len(response_content), response_content,
197
+ )
198
+ return response_content
199
+
200
+ except AllGroqApisFailedError:
201
+ raise
202
+ except Exception as e:
203
+ logger.error("Error in realtime get_response: %s", e, exc_info=True)
204
+ raise
205
+
206
+ def prefetch_web_search(
207
+ self, question: str, chat_history: Optional[List[tuple]] = None
208
+ ) -> Tuple[str, Optional[dict]]:
209
+ try:
210
+ t0 = time.perf_counter()
211
+ search_query = self._extract_search_query(question, chat_history)
212
+ logger.info("[REALTIME] Pre-fetch: extracted query '%s' in %.3fs", search_query[:60], time.perf_counter() - t0)
213
+ formatted_results, payload = self.search_tavily(search_query, num_results=7)
214
+ if formatted_results:
215
+ logger.info("[REALTIME] Pre-fetch: Tavily returned %d chars in %.3fs total",
216
+ len(formatted_results), time.perf_counter() - t0)
217
+ return (formatted_results or "", payload)
218
+ except Exception as e:
219
+ logger.warning("[REALTIME] Pre-fetch failed: %s", e)
220
+ return ("", None)
221
+
222
+ def stream_response(self, question: str, chat_history: Optional[List[tuple]] = None, key_start_index: int = 0) -> Iterator[Any]:
223
+ try:
224
+ yield {"_activity": {"event": "extracting_query", "message": "Extracting search query..."}}
225
+ search_query = self._extract_search_query(question, chat_history)
226
+ logger.info("[REALTIME] Searching Tavily for: %s", search_query)
227
+ yield {"_activity": {"event": "searching_web", "query": search_query, "message": f"Searching web for: {search_query}"}}
228
+
229
+ formatted_results, payload = self.search_tavily(search_query, num_results=7)
230
+ num_results = len(payload.get("results", [])) if payload else 0
231
+ if formatted_results:
232
+ logger.info("[REALTIME] Tavily returned results (length: %d chars)", len(formatted_results))
233
+ yield {"_activity": {"event": "search_completed", "message": f"Search completed: {num_results} results, {len(formatted_results)} chars of context"}}
234
+ else:
235
+ logger.warning("[REALTIME] Tavily returned no results for: %s", search_query)
236
+ yield {"_activity": {"event": "search_completed", "message": "No search results found"}}
237
+
238
+ if payload:
239
+ yield {"_search_results": payload}
240
+
241
+ extra_parts = [escape_curly_braces(formatted_results)] if formatted_results else None
242
+ prompt, messages = self._build_prompt_and_messages(
243
+ question, chat_history,
244
+ extra_system_parts=extra_parts,
245
+ mode_addendum=REALTIME_CHAT_ADDENDUM,
246
+ )
247
+ yield from self._stream_llm(prompt, messages, question, key_start_index=key_start_index)
248
+ logger.info("[REALTIME] Stream completed for: %s", search_query)
249
+
250
+ except AllGroqApisFailedError:
251
+ raise
252
+ except Exception as e:
253
+ logger.error("Error in realtime stream_response: %s", e, exc_info=True)
254
+ raise
255
+
256
+ def stream_response_with_prefetched(
257
+ self,
258
+ question: str,
259
+ chat_history: Optional[List[tuple]] = None,
260
+ formatted_results: Optional[str] = None,
261
+ payload: Optional[dict] = None,
262
+ key_start_index: int = 0,
263
+ ) -> Iterator[Any]:
264
+ try:
265
+ extra_parts = [escape_curly_braces(formatted_results)] if formatted_results else None
266
+ prompt, messages = self._build_prompt_and_messages(
267
+ question, chat_history,
268
+ extra_system_parts=extra_parts,
269
+ mode_addendum=REALTIME_CHAT_ADDENDUM,
270
+ )
271
+ yield from self._stream_llm(prompt, messages, question, key_start_index=key_start_index)
272
+ logger.info("[REALTIME] Stream completed (pre-fetched results)")
273
+ except AllGroqApisFailedError:
274
+ raise
275
+ except Exception as e:
276
+ logger.error("Error in realtime stream_response_with_prefetched: %s", e, exc_info=True)
277
+ raise
app/services/vector_store.py ADDED
@@ -0,0 +1,111 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import logging
3
+ from pathlib import Path
4
+ from typing import List, Optional
5
+
6
+ from langchain_text_splitters import RecursiveCharacterTextSplitter
7
+ from langchain_huggingface import HuggingFaceEmbeddings
8
+ from langchain_community.vectorstores import FAISS
9
+ from langchain_core.documents import Document
10
+
11
+ from config import (
12
+ LEARNING_DATA_DIR,
13
+ CHATS_DATA_DIR,
14
+ VECTOR_STORE_DIR,
15
+ EMBEDDING_MODEL,
16
+ CHUNK_SIZE,
17
+ CHUNK_OVERLAP,
18
+ )
19
+
20
+ logger = logging.getLogger("J.A.R.V.I.S")
21
+
22
+ class VectorStoreService:
23
+ def __init__(self):
24
+ self.embeddings = HuggingFaceEmbeddings(
25
+ model_name=EMBEDDING_MODEL,
26
+ model_kwargs={"device": "cpu"},
27
+ )
28
+ self.text_splitter = RecursiveCharacterTextSplitter(
29
+ chunk_size=CHUNK_SIZE,
30
+ chunk_overlap=CHUNK_OVERLAP,
31
+ )
32
+ self.vector_store: Optional[FAISS] = None
33
+ self._retriever_cache: dict = {}
34
+
35
+ def load_learning_data(self) -> List[Document]:
36
+ documents = []
37
+ for file_path in sorted(LEARNING_DATA_DIR.glob("*.txt")):
38
+ try:
39
+ with open(file_path, "r", encoding="utf-8") as f:
40
+ content = f.read().strip()
41
+ if content:
42
+ documents.append(Document(page_content=content, metadata={"source": str(file_path.name)}))
43
+ logger.info("[VECTOR] Loaded learning data: %s (%d chars)", file_path.name, len(content))
44
+ except Exception as e:
45
+ logger.warning("Could not load learning data file %s: %s", file_path, e)
46
+ logger.info("[VECTOR] Total learning data files loaded: %d", len(documents))
47
+ return documents
48
+
49
+ def load_chat_history(self) -> List[Document]:
50
+ documents = []
51
+ for file_path in sorted(CHATS_DATA_DIR.glob("*.json")):
52
+ try:
53
+ with open(file_path, "r", encoding="utf-8") as f:
54
+ chat_data = json.load(f)
55
+
56
+ messages = chat_data.get("messages", [])
57
+ if not isinstance(messages, list):
58
+ continue
59
+
60
+ lines = []
61
+ for msg in messages:
62
+ if not isinstance(msg, dict):
63
+ continue
64
+ role = msg.get("role") or "assistant"
65
+ content = msg.get("content") or ""
66
+ prefix = "User: " if role == "user" else "Assistant: "
67
+ lines.append(prefix + content)
68
+ chat_content = "\n".join(lines)
69
+
70
+ if chat_content.strip():
71
+ documents.append(Document(page_content=chat_content, metadata={"source": f"chat_{file_path.stem}"}))
72
+ logger.info("[VECTOR] Loaded chat history: %s (%d messages)", file_path.name, len(messages))
73
+ except Exception as e:
74
+ logger.warning("Could not load chat history file %s: %s", file_path, e)
75
+ logger.info("[VECTOR] Total chat history files loaded: %d", len(documents))
76
+ return documents
77
+
78
+ def create_vector_store(self) -> FAISS:
79
+ learning_docs = self.load_learning_data()
80
+ chat_docs = self.load_chat_history()
81
+ all_documents = learning_docs + chat_docs
82
+ logger.info("[VECTOR] Total documents to index: %d (learning: %d, chat: %d)",
83
+ len(all_documents), len(learning_docs), len(chat_docs))
84
+
85
+ if not all_documents:
86
+ self.vector_store = FAISS.from_texts(["No data available yet."], self.embeddings)
87
+ logger.info("[VECTOR] No documents found, created placeholder index")
88
+ else:
89
+ chunks = self.text_splitter.split_documents(all_documents)
90
+ logger.info("[VECTOR] Split into %d chunks (chunk_size=%d, overlap=%d)",
91
+ len(chunks), CHUNK_SIZE, CHUNK_OVERLAP)
92
+ self.vector_store = FAISS.from_documents(chunks, self.embeddings)
93
+ logger.info("[VECTOR] FAISS index built successfully with %d vectors", len(chunks))
94
+
95
+ self._retriever_cache.clear()
96
+ self.save_vector_store()
97
+ return self.vector_store
98
+
99
+ def save_vector_store(self):
100
+ if self.vector_store:
101
+ try:
102
+ self.vector_store.save_local(str(VECTOR_STORE_DIR))
103
+ except Exception as e:
104
+ logger.error("Failed to save vector store to disk: %s", e)
105
+
106
+ def get_retriever(self, k: int = 10):
107
+ if not self.vector_store:
108
+ raise RuntimeError("Vector store not initialized. This should not happen.")
109
+ if k not in self._retriever_cache:
110
+ self._retriever_cache[k] = self.vector_store.as_retriever(search_kwargs={"k": k})
111
+ return self._retriever_cache[k]
app/utils/__init__.py ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ UTILITIES PACKAGE
3
+ =================
4
+
5
+ Helpers used by the services (no HTTP, no business logic):
6
+
7
+ time_info - get_time_information(): returns a string with current date/time for the LLM prompt.
8
+ retry - with_retry(fn): calls fn(); on failure retries with exponential backoff (Groq/Tavily).
9
+ """
app/utils/__pycache__/__init__.cpython-312.pyc ADDED
Binary file (449 Bytes). View file
 
app/utils/__pycache__/key_rotation.cpython-312.pyc ADDED
Binary file (979 Bytes). View file
 
app/utils/__pycache__/retry.cpython-312.pyc ADDED
Binary file (1.29 kB). View file
 
app/utils/__pycache__/time_info.cpython-312.pyc ADDED
Binary file (1.3 kB). View file
 
app/utils/key_rotation.py ADDED
@@ -0,0 +1,24 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import threading
2
+ from typing import Tuple, Optional
3
+
4
+ _counter = 0
5
+ _lock = threading.Lock()
6
+
7
+ def get_next_key_pair(n_keys: int, need_brain: bool = True) -> Tuple[Optional[int], int]:
8
+ global _counter
9
+ if n_keys <= 0:
10
+ return (None, 0)
11
+ with _lock:
12
+ if need_brain:
13
+ if n_keys >= 2:
14
+ brain = _counter % n_keys
15
+ chat = (_counter + 1) % n_keys
16
+ _counter += 2
17
+ return (brain, chat)
18
+ else:
19
+ _counter += 1
20
+ return (0, 0)
21
+ else:
22
+ chat = _counter % n_keys
23
+ _counter += 1
24
+ return (None, chat)
app/utils/retry.py ADDED
@@ -0,0 +1,38 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ import time
3
+ from typing import TypeVar, Callable
4
+
5
+ logger = logging.getLogger("J.A.R.V.I.S")
6
+
7
+ T = TypeVar("T")
8
+
9
+ def with_retry(
10
+ fn: Callable[[], T],
11
+ max_retries: int = 3,
12
+ initial_delay: float = 1.0,
13
+ ) -> T:
14
+ if max_retries <= 0:
15
+ return fn()
16
+
17
+ last_exception = None
18
+ delay = initial_delay
19
+
20
+ for attempt in range(max_retries):
21
+ try:
22
+ return fn()
23
+ except Exception as e:
24
+ last_exception = e
25
+ if attempt == max_retries - 1:
26
+ raise
27
+ logger.warning(
28
+ "Attempt %s/%s failed (%s). Retrying in %.1fs: %s",
29
+ attempt + 1,
30
+ max_retries,
31
+ fn.__name__ if hasattr(fn, "__name__") else "call",
32
+ delay,
33
+ e,
34
+ )
35
+ time.sleep(delay)
36
+ delay *= 2
37
+
38
+ raise last_exception
app/utils/time_info.py ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ TIME INFORMATION UTILITY
3
+ ========================
4
+
5
+ Returns a short, readable string with the current date and time. This is
6
+ injected into the system prompt so the LLM can answer "what day is it?"
7
+ and similar questions. Called by both GroqService and RealtimeGroqService.
8
+ """
9
+
10
+ import datetime
11
+
12
+ def get_time_information() -> str:
13
+ """Return a few lines of text: day name, date, month, year, and time (24h)."""
14
+ now = datetime.datetime.now()
15
+ return (
16
+ f"Current Real-time Information:\n"
17
+ f"Day: {now.strftime('%A')}\n" # e.g. Monday
18
+ f"Date: {now.strftime('%d')}\n" # e.g. 05
19
+ f"Month: {now.strftime('%B')}\n" # e.g. February
20
+ f"Year: {now.strftime('%Y')}\n" # e.g. 2026
21
+ f"Time: {now.strftime('%H')} hours, {now.strftime('%M')} minutes, {now.strftime('%S')} seconds\n"
22
+ )