| import logging |
| import os |
| import asyncio |
| import hashlib |
| import datetime |
| import json |
| import re |
| import csv |
| from pathlib import Path |
| from typing import Any, Dict |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| try: |
| import nest_asyncio |
| nest_asyncio.apply() |
| except (ValueError, ImportError): |
| pass |
|
|
| import common_utils |
| import inference_logic |
|
|
| |
|
|
| def analyze_video_veracity(video_url: str, specific_question: str = "", agent_config: dict = None) -> dict: |
| """Tool to analyze video veracity.""" |
| if agent_config is None: agent_config = {} |
| loop = asyncio.get_event_loop() |
| if loop.is_running(): |
| import concurrent.futures |
| with concurrent.futures.ThreadPoolExecutor() as pool: |
| return pool.submit(asyncio.run, _analyze_video_async(video_url, specific_question, agent_config)).result() |
| else: |
| return asyncio.run(_analyze_video_async(video_url, specific_question, agent_config)) |
|
|
| async def _analyze_video_async(video_url: str, context: str, agent_config: dict) -> dict: |
| try: |
| use_search = agent_config.get("use_search", False) |
| use_code = agent_config.get("use_code", False) |
| provider = agent_config.get("provider", "vertex") |
| api_key = agent_config.get("api_key", os.getenv("GEMINI_API_KEY", "")) |
| project_id = agent_config.get("project_id", os.getenv("VERTEX_PROJECT_ID", "")) |
| location = agent_config.get("location", os.getenv("VERTEX_LOCATION", "us-central1")) |
| model_name = agent_config.get("model_name", os.getenv("VERTEX_MODEL_NAME", "gemini-1.5-pro")) |
| reasoning_method = agent_config.get("reasoning_method", "cot") |
| prompt_template = agent_config.get("prompt_template", "standard") |
| |
| request_id = hashlib.md5(f"{video_url}_{datetime.datetime.now()}".encode()).hexdigest()[:10] |
| assets = await common_utils.prepare_video_assets(video_url, request_id) |
| |
| |
| try: |
| from labeling_logic import PROMPT_VARIANTS |
| sel_p = PROMPT_VARIANTS.get(prompt_template, PROMPT_VARIANTS['standard']) |
| system_persona_txt = sel_p['instruction'] |
| except Exception: |
| system_persona_txt = "You are a Factuality Agent." |
| |
| system_persona = f"You are the LiarMP4 Verifier. Context: {context}\n\nPersona: {system_persona_txt}" |
| |
| trans = common_utils.parse_vtt(assets['transcript']) if assets.get('transcript') else "No transcript." |
|
|
| final_result = None |
| raw_toon_text = "" |
| pipeline_logs =[] |
| |
| if provider == "gemini": |
| if not api_key: |
| return {"error": "Gemini API Key missing. Please provide it in the Inference Config."} |
| gemini_config = {"api_key": api_key, "model_name": model_name, "max_retries": 3, "use_search": use_search, "use_code": use_code} |
| async for chunk in inference_logic.run_gemini_labeling_pipeline( |
| video_path=assets.get('video'), |
| caption=assets.get('caption', ''), |
| transcript=trans, |
| gemini_config=gemini_config, |
| include_comments=False, |
| reasoning_method=reasoning_method, |
| system_persona=system_persona, |
| request_id=request_id |
| ): |
| if isinstance(chunk, str): |
| pipeline_logs.append(chunk.strip()) |
| elif isinstance(chunk, dict) and "parsed_data" in chunk: |
| final_result = chunk["parsed_data"] |
| raw_toon_text = chunk.get("raw_toon", "") |
| else: |
| if not project_id: |
| return {"error": "Vertex Project ID missing. Please provide it in the Inference Config."} |
| vertex_config = { |
| "project_id": project_id, |
| "location": location, |
| "model_name": model_name, |
| "max_retries": 3, |
| "use_search": use_search, |
| "use_code": use_code, |
| "api_key": api_key |
| } |
| async for chunk in inference_logic.run_vertex_labeling_pipeline( |
| video_path=assets.get('video'), |
| caption=assets.get('caption', ''), |
| transcript=trans, |
| vertex_config=vertex_config, |
| include_comments=False, |
| reasoning_method=reasoning_method, |
| system_persona=system_persona, |
| request_id=request_id |
| ): |
| if isinstance(chunk, str): |
| pipeline_logs.append(chunk.strip()) |
| elif isinstance(chunk, dict) and "parsed_data" in chunk: |
| final_result = chunk["parsed_data"] |
| raw_toon_text = chunk.get("raw_toon", "") |
| |
| if final_result: |
| |
| gt_score = None |
| manual_path = Path("data/manual_dataset.csv") |
| if manual_path.exists(): |
| for row in common_utils.robust_read_csv(manual_path): |
| if common_utils.normalize_link(row.get('link', '')) == common_utils.normalize_link(video_url): |
| try: gt_score = float(row.get('final_veracity_score', 0)) |
| except: pass |
| break |
|
|
| |
| ai_score_val = final_result.get('final_assessment', {}).get('veracity_score_total', 0) |
| try: ai_score = float(ai_score_val) |
| except: ai_score = 0 |
| |
| reasoning = final_result.get('final_assessment', {}).get('reasoning', 'No reasoning provided.') |
|
|
| vec = final_result.get('veracity_vectors', {}) |
| mod = final_result.get('modalities', {}) |
| fact = final_result.get('factuality_factors', {}) |
|
|
| reply_text = f"[ANALYSIS COMPLETE]\nVideo: {video_url}\n\n" |
| reply_text += "--- AGENT PIPELINE LOGS ---\n" |
| reply_text += "\n".join([log for log in pipeline_logs if log]) + "\n\n" |
|
|
| reply_text += f"Final Veracity Score: {ai_score}/100\n" |
| reply_text += f"Reasoning: {reasoning}\n\n" |
| |
| reply_text += "--- VERACITY VECTORS ---\n" |
| reply_text += f"Visual Integrity : {vec.get('visual_integrity_score', 'N/A')}\n" |
| reply_text += f"Audio Integrity : {vec.get('audio_integrity_score', 'N/A')}\n" |
| reply_text += f"Source Credibility : {vec.get('source_credibility_score', 'N/A')}\n" |
| reply_text += f"Logical Consistency : {vec.get('logical_consistency_score', 'N/A')}\n" |
| reply_text += f"Emotional Manipulation : {vec.get('emotional_manipulation_score', 'N/A')}\n\n" |
| |
| reply_text += "--- MODALITIES ---\n" |
| reply_text += f"Video-Audio : {mod.get('video_audio_score', 'N/A')}\n" |
| reply_text += f"Video-Caption : {mod.get('video_caption_score', 'N/A')}\n" |
| reply_text += f"Audio-Caption : {mod.get('audio_caption_score', 'N/A')}\n" |
|
|
| reply_text += "\n--- FACTUALITY FACTORS ---\n" |
| reply_text += f"Claim Accuracy : {fact.get('claim_accuracy', 'N/A')}\n" |
| reply_text += f"Evidence Gap : {fact.get('evidence_gap', 'N/A')}\n" |
| reply_text += f"Grounding Check : {fact.get('grounding_check', 'N/A')}\n" |
|
|
| if gt_score is not None: |
| delta = abs(ai_score - gt_score) |
| reply_text += f"\n--- GROUND TRUTH COMPARISON ---\n" |
| reply_text += f"Verified GT Score : {gt_score}/100\n" |
| reply_text += f"AI Generated Score : {ai_score}/100\n" |
| reply_text += f"Accuracy Delta : {delta} points\n" |
| |
| reply_text += "\n--- RAW TOON OUTPUT ---\n" |
| reply_text += f"{raw_toon_text}\n\n" |
|
|
| config_params_str = json.dumps({"agent_active": True, "use_search": use_search, "use_code": use_code}) |
| |
| |
| d_path = Path("data/dataset.csv") |
| try: |
| with open(d_path, 'a', newline='', encoding='utf-8') as f: |
| row = { |
| "id": request_id, "link": video_url, "timestamp": datetime.datetime.now().isoformat(), |
| "caption": assets.get('caption', ''), |
| "final_veracity_score": ai_score, |
| "visual_score": final_result.get('veracity_vectors', {}).get('visual_integrity_score', 0), |
| "audio_score": final_result.get('veracity_vectors', {}).get('audio_integrity_score', 0), |
| "source_score": final_result.get('veracity_vectors', {}).get('source_credibility_score', 0), |
| "logic_score": final_result.get('veracity_vectors', {}).get('logical_consistency_score', 0), |
| "emotion_score": final_result.get('veracity_vectors', {}).get('emotional_manipulation_score', 0), |
| "align_video_audio": final_result.get('modalities', {}).get('video_audio_score', 0), |
| "align_video_caption": final_result.get('modalities', {}).get('video_caption_score', 0), |
| "align_audio_caption": final_result.get('modalities', {}).get('audio_caption_score', 0), |
| "classification": final_result.get('disinformation_analysis', {}).get('classification', 'None'), |
| "reasoning": reasoning, |
| "tags": ",".join(final_result.get('tags',[])), |
| "raw_toon": raw_toon_text, |
| "config_type": "A2A Agent", |
| "config_model": model_name, |
| "config_prompt": prompt_template, |
| "config_reasoning": reasoning_method, |
| "config_params": config_params_str |
| } |
| writer = csv.DictWriter(f, fieldnames=[ |
| "id", "link", "timestamp", "caption", |
| "final_veracity_score", "visual_score", "audio_score", "source_score", "logic_score", "emotion_score", |
| "align_video_audio", "align_video_caption", "align_audio_caption", |
| "classification", "reasoning", "tags", "raw_toon", |
| "config_type", "config_model", "config_prompt", "config_reasoning", "config_params" |
| ], extrasaction='ignore') |
| if not d_path.exists() or d_path.stat().st_size == 0: writer.writeheader() |
| writer.writerow(row) |
| except Exception as e: |
| logger.error(f"Failed writing A2A to dataset: {e}") |
|
|
| |
| try: |
| ts_clean = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
| flat_parsed = final_result.copy() |
| flat_parsed["raw_toon"] = raw_toon_text |
| flat_parsed["meta_info"] = { |
| "id": request_id, "timestamp": datetime.datetime.now().isoformat(), "link": video_url, |
| "prompt_used": "A2A Agent Prompt", |
| "model_selection": provider, |
| "config_type": "GenAI A2A", |
| "config_model": model_name, |
| "config_prompt": prompt_template, |
| "config_reasoning": reasoning_method, |
| "config_params": {"agent_active": True, "use_search": use_search, "use_code": use_code} |
| } |
| with open(Path(f"data/labels/{request_id}_{ts_clean}.json"), 'w', encoding='utf-8') as f: |
| json.dump(flat_parsed, f, indent=2, ensure_ascii=False) |
| except Exception as e: |
| logger.error(f"Failed saving A2A raw JSON sidecar: {e}") |
|
|
| reply_text += f"\n[Pipeline] Successfully parsed context, analyzed factuality, and saved raw AI Label File to Data Manager (Provider: {provider}, Model: {model_name}, Search: {use_search})." |
|
|
| return {"text": reply_text, "data": final_result} |
| |
| return {"error": "Inference yielded no data or credentials missing."} |
|
|
| except Exception as e: |
| logger.error(f"[Tool Error] {e}") |
| return {"error": str(e)} |
|
|
| |
| def create_a2a_app(): |
| """Creates a robust Starlette/FastAPI app that implements core A2A JSON-RPC behavior.""" |
| from fastapi import FastAPI, Request |
| |
| a2a_app = FastAPI(title="LiarMP4 A2A Agent") |
|
|
| @a2a_app.post("/") |
| @a2a_app.post("/jsonrpc") |
| async def jsonrpc_handler(request: Request): |
| try: |
| data = await request.json() |
| method = data.get("method", "agent.process") |
| params = data.get("params", {}) |
| |
| input_text = "" |
| agent_config = {} |
| if isinstance(params, dict): |
| input_text = params.get("input", params.get("text", params.get("query", params.get("prompt", "")))) |
| agent_config = params.get("agent_config", {}) |
| if not input_text and "url" in params: |
| input_text = params["url"] |
| elif isinstance(params, list) and len(params) > 0: |
| if isinstance(params[0], dict): |
| input_text = params[0].get("text", params[0].get("input", "")) |
| else: |
| input_text = str(params[0]) |
| elif isinstance(params, str): |
| input_text = params |
| |
| |
| accepted_methods =["agent.process", "agent.generate", "model.generate", "a2a.generate", "a2a.interact", "agent.interact"] |
| |
| if method in accepted_methods or not method: |
| |
| |
| update_config = {} |
| low_input = str(input_text).lower() |
| if "set provider to " in low_input: |
| val = low_input.split("set provider to ")[-1].strip().split()[0] |
| if val in["gemini", "vertex"]: update_config["provider"] = val |
| if "set api key to " in low_input: |
| val = input_text.split("set api key to ")[-1].strip().split()[0] |
| update_config["api_key"] = val |
| if "set project id to " in low_input: |
| val = input_text.split("set project id to ")[-1].strip().split()[0] |
| update_config["project_id"] = val |
| |
| if update_config: |
| return { |
| "jsonrpc": "2.0", "id": data.get("id", 1), |
| "result": { |
| "text": f"✅ Agent configuration updated automatically ({', '.join(update_config.keys())}). You can now provide a video link or further instructions.", |
| "update_config": update_config |
| } |
| } |
|
|
| urls = re.findall(r'(https?://[^\s]+)', str(input_text)) |
| |
| if urls: |
| url = urls[0] |
| logger.info(f"Agent Processing Video URL: {url} | Config: {agent_config}") |
| res = await _analyze_video_async(url, str(input_text), agent_config) |
| |
| if "error" in res: |
| reply = f"Error analyzing video: {res['error']}" |
| else: |
| reply = res.get("text", "Processing finished but no reply generated.") |
| else: |
| |
| provider = agent_config.get("provider", "vertex") |
| api_key = agent_config.get("api_key", "") |
| project_id = agent_config.get("project_id", "") |
| |
| base_capabilities = ( |
| "**Agent Capabilities:**\n" |
| "- Process raw video & audio modalities via A2A\n" |
| "- Fetch & analyze comment sentiment and community context\n" |
| "- Run full Factuality pipeline (FCoT) & Generate Veracity Vectors\n" |
| "- Automatically save raw AI Labeled JSON files & sync to Data Manager\n" |
| "- Verify and compare AI outputs against Ground Truth\n" |
| "- Reprompt dynamically for missing scores or incomplete data\n\n" |
| "**Easy Command:**\n" |
| "Use `Run full pipeline on[URL]` to analyze a video, extract all vectors (source, logic, emotion, etc.), and save aligned files." |
| ) |
| |
| if provider == 'vertex' and not project_id: |
| reply = f"Welcome to the LiarMP4 Agent Nexus!\n\nIt looks like you haven't configured **Vertex AI** yet. Please enter your Google Cloud Project ID in the 'Inference Config' panel on the left, or tell me directly: *'set project id to[YOUR_PROJECT]'*.\n\n{base_capabilities}" |
| elif provider == 'gemini' and not api_key: |
| reply = f"👋 Welcome to the LiarMP4 Agent Nexus!\n\nIt looks like you haven't configured **Gemini** yet. Please enter your API Key in the 'Inference Config' panel on the left, or tell me directly: *'set api key to[YOUR_KEY]'*.\n\n{base_capabilities}" |
| else: |
| reply = f"✅ I am the LiarMP4 Verifier, fully configured ({provider.capitalize()}) and ready!\n\n{base_capabilities}" |
|
|
| return { |
| "jsonrpc": "2.0", |
| "id": data.get("id", 1), |
| "result": { |
| "text": reply, |
| "data": {"status": "success", "agent": "LiarMP4_A2A"} |
| } |
| } |
| else: |
| logger.warning(f"A2A Agent rejected unknown method: {method}") |
| return { |
| "jsonrpc": "2.0", |
| "id": data.get("id", 1), |
| "error": { |
| "code": -32601, |
| "message": f"Method '{method}' not found. Supported: {', '.join(accepted_methods)}" |
| } |
| } |
| except Exception as e: |
| logger.error(f"A2A Parse Error: {e}") |
| return {"jsonrpc": "2.0", "id": None, "error": {"code": -32700, "message": "Parse error"}} |
| |
| logger.info("✅ A2A Custom Agent App created successfully.") |
| return a2a_app |
|
|