Spaces:
Running
Running
| # handlers/agentic_handlers.py | |
| import gradio as gr | |
| import logging | |
| from collections import defaultdict | |
| import json # Added for JSON serialization/deserialization | |
| # Attempt to import agentic pipeline functions and UI formatters | |
| try: | |
| from run_agentic_pipeline import run_full_analytics_orchestration | |
| from ui.insights_ui_generator import ( | |
| format_report_to_markdown, | |
| extract_key_results_for_selection, | |
| format_single_okr_for_display | |
| ) | |
| AGENTIC_MODULES_LOADED = True | |
| except ImportError as e: | |
| logging.error(f"Could not import agentic pipeline modules for AgenticHandlers: {e}.") | |
| AGENTIC_MODULES_LOADED = False | |
| # Define placeholder functions if modules are not loaded to avoid NameErrors during class definition | |
| async def run_full_analytics_orchestration(*args, **kwargs): return None | |
| def format_report_to_markdown(report_string): return "Agentic modules not loaded. Report unavailable." | |
| def extract_key_results_for_selection(okrs_dict): return [] | |
| def format_single_okr_for_display(okr_data, **kwargs): return "Agentic modules not loaded. OKR display unavailable." | |
| class AgenticHandlers: | |
| def __init__(self, agentic_report_components, agentic_okrs_components, | |
| token_state_ref, orchestration_raw_results_st_ref, | |
| key_results_for_selection_st_ref, selected_key_result_ids_st_ref): | |
| self.report_components = agentic_report_components | |
| self.okrs_components = agentic_okrs_components | |
| # References to global states | |
| self.token_state = token_state_ref | |
| self.orchestration_raw_results_st = orchestration_raw_results_st_ref | |
| self.key_results_for_selection_st = key_results_for_selection_st_ref | |
| self.selected_key_result_ids_st = selected_key_result_ids_st_ref | |
| self.agentic_modules_really_loaded = AGENTIC_MODULES_LOADED | |
| logging.info(f"AgenticHandlers initialized. Modules loaded: {self.agentic_modules_really_loaded}") | |
| async def run_agentic_pipeline_autonomously_on_update(self, current_token_state_val): | |
| """ | |
| This function is intended to be triggered by changes in token_state. | |
| It yields updates for the agentic report and OKR tabs. | |
| State values (5th, 6th, 7th) are serialized to JSON strings. | |
| Updates for key_results_cbg are now simplified without choices/interactive. | |
| """ | |
| logging.info(f"Agentic pipeline auto-trigger. Token: {'Set' if current_token_state_val.get('token') else 'Not Set'}") | |
| initial_report_status = "Pipeline AI: In attesa dei dati necessari..." | |
| initial_okr_details = "Pipeline AI: In attesa dei dati necessari..." | |
| initial_orchestration_results = self.orchestration_raw_results_st.value | |
| initial_selected_krs = self.selected_key_result_ids_st.value | |
| initial_krs_for_selection = self.key_results_for_selection_st.value | |
| report_status_md_update = gr.update(value=initial_report_status) if self.report_components.get("agentic_pipeline_status_md") else gr.update() | |
| report_display_md_update = gr.update() | |
| # Simple update for checkbox component - just reset value | |
| okrs_cbg_update = gr.update(value=[]) if self.okrs_components.get("key_results_cbg") else gr.update() | |
| okrs_detail_md_update = gr.update(value=initial_okr_details) if self.okrs_components.get("okr_detail_display_md") else gr.update() | |
| if not current_token_state_val or not current_token_state_val.get("token"): | |
| logging.info("Agentic pipeline: Token not available in token_state. Skipping actual run.") | |
| yield ( | |
| report_status_md_update, | |
| report_display_md_update, | |
| okrs_cbg_update, | |
| okrs_detail_md_update, | |
| json.dumps(initial_orchestration_results), # Serialize to JSON | |
| json.dumps(initial_selected_krs), # Serialize to JSON | |
| json.dumps(initial_krs_for_selection) # Serialize to JSON | |
| ) | |
| return | |
| in_progress_status = "Analisi AI (Sempre) in corso..." | |
| if self.report_components.get("agentic_pipeline_status_md"): | |
| report_status_md_update = gr.update(value=in_progress_status) | |
| if self.okrs_components.get("okr_detail_display_md"): | |
| okrs_detail_md_update = gr.update(value="Dettagli OKR (Sempre) in corso di generazione...") | |
| yield ( | |
| report_status_md_update, report_display_md_update, okrs_cbg_update, okrs_detail_md_update, | |
| json.dumps(initial_orchestration_results), # Serialize to JSON | |
| json.dumps(initial_selected_krs), # Serialize to JSON | |
| json.dumps(initial_krs_for_selection) # Serialize to JSON | |
| ) | |
| if not self.agentic_modules_really_loaded: | |
| logging.warning("Agentic modules not loaded. Skipping autonomous pipeline actual run.") | |
| error_status = "Moduli AI non caricati. Operazione non disponibile." | |
| if self.report_components.get("agentic_pipeline_status_md"): | |
| report_status_md_update = gr.update(value=error_status) | |
| if self.report_components.get("agentic_report_display_md"): | |
| report_display_md_update = gr.update(value=error_status) | |
| # Simple update for checkbox in error case | |
| if self.okrs_components.get("key_results_cbg"): | |
| okrs_cbg_update = gr.update(value=[]) | |
| if self.okrs_components.get("okr_detail_display_md"): | |
| okrs_detail_md_update = gr.update(value=error_status) | |
| yield ( | |
| report_status_md_update, report_display_md_update, okrs_cbg_update, okrs_detail_md_update, | |
| json.dumps(None), json.dumps([]), json.dumps([]) # Serialize to JSON | |
| ) | |
| return | |
| try: | |
| date_filter_val_agentic = "Sempre" | |
| custom_start_val_agentic = None | |
| custom_end_val_agentic = None | |
| logging.info("Agentic pipeline: Calling run_full_analytics_orchestration...") | |
| orchestration_output = await run_full_analytics_orchestration( | |
| current_token_state_val, | |
| date_filter_val_agentic, | |
| custom_start_val_agentic, | |
| custom_end_val_agentic | |
| ) | |
| final_status_text = "Pipeline AI (Sempre) completata." | |
| logging.info(f"Autonomous agentic pipeline finished. Output keys: {orchestration_output.keys() if orchestration_output else 'None'}") | |
| orchestration_results_update_val = None | |
| selected_krs_update_val = [] # This will be the value for the CheckboxGroup, initially empty | |
| krs_for_selection_update_val = [] | |
| if orchestration_output: | |
| orchestration_results_update_val = orchestration_output | |
| report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report testuale fornito.") | |
| if self.report_components.get("agentic_report_display_md"): | |
| formatted_report = format_report_to_markdown(report_str) | |
| # Ensure we have a string, not a list | |
| if isinstance(formatted_report, list): | |
| formatted_report = "\n".join(formatted_report) | |
| elif not isinstance(formatted_report, str): | |
| formatted_report = str(formatted_report) | |
| report_display_md_update = gr.update(value=formatted_report) | |
| actionable_okrs = orchestration_output.get('actionable_okrs_and_tasks') | |
| krs_for_ui_selection_list = extract_key_results_for_selection(actionable_okrs) | |
| krs_for_selection_update_val = krs_for_ui_selection_list # This is the list of dicts | |
| # For checkbox, just reset the value - don't update choices programmatically | |
| if self.okrs_components.get("key_results_cbg"): | |
| okrs_cbg_update = gr.update(value=[]) | |
| all_okrs_md_parts = [] | |
| if actionable_okrs and isinstance(actionable_okrs.get("okrs"), list): | |
| for okr_idx, okr_item in enumerate(actionable_okrs["okrs"]): | |
| all_okrs_md_parts.append(format_single_okr_for_display(okr_item, accepted_kr_indices=None, okr_main_index=okr_idx)) | |
| if not all_okrs_md_parts: | |
| if self.okrs_components.get("okr_detail_display_md"): | |
| okrs_detail_md_update = gr.update(value="Nessun OKR generato o trovato (Sempre).") | |
| else: | |
| if self.okrs_components.get("okr_detail_display_md"): | |
| okrs_detail_md_update = gr.update(value="\n\n---\n\n".join(all_okrs_md_parts)) | |
| selected_krs_update_val = [] # Reset CheckboxGroup selection | |
| else: | |
| final_status_text = "Pipeline AI (Sempre): Nessun risultato prodotto." | |
| if self.report_components.get("agentic_report_display_md"): | |
| report_display_md_update = gr.update(value="Nessun report generato dalla pipeline AI (Sempre).") | |
| # Simple update for checkbox if no output | |
| if self.okrs_components.get("key_results_cbg"): | |
| okrs_cbg_update = gr.update(value=[]) | |
| if self.okrs_components.get("okr_detail_display_md"): | |
| okrs_detail_md_update = gr.update(value="Nessun OKR generato o errore nella pipeline AI (Sempre).") | |
| if self.report_components.get("agentic_pipeline_status_md"): | |
| report_status_md_update = gr.update(value=final_status_text) | |
| yield ( | |
| report_status_md_update, report_display_md_update, okrs_cbg_update, okrs_detail_md_update, | |
| json.dumps(orchestration_results_update_val), # Serialize to JSON | |
| json.dumps(selected_krs_update_val), # Serialize to JSON (value for selected_key_result_ids_st) | |
| json.dumps(krs_for_selection_update_val) # Serialize to JSON (value for key_results_for_selection_st) | |
| ) | |
| except Exception as e: | |
| logging.error(f"Error during autonomous agentic pipeline execution: {e}", exc_info=True) | |
| error_status_text = f"Errore pipeline AI (Sempre): {str(e)}" | |
| if self.report_components.get("agentic_pipeline_status_md"): | |
| report_status_md_update = gr.update(value=error_status_text) | |
| if self.report_components.get("agentic_report_display_md"): | |
| report_display_md_update = gr.update(value=f"Errore generazione report AI (Sempre): {str(e)}") | |
| # Simple update for checkbox in case of exception | |
| if self.okrs_components.get("key_results_cbg"): | |
| okrs_cbg_update = gr.update(value=[]) | |
| if self.okrs_components.get("okr_detail_display_md"): | |
| okrs_detail_md_update = gr.update(value=f"Errore generazione OKR AI (Sempre): {str(e)}") | |
| yield ( | |
| report_status_md_update, report_display_md_update, okrs_cbg_update, okrs_detail_md_update, | |
| json.dumps(None), json.dumps([]), json.dumps([]) # Serialize to JSON | |
| ) | |
| def update_okr_display_on_kr_selection(self, selected_kr_unique_ids: list, | |
| raw_orchestration_results_json: str, | |
| all_krs_for_selection_list_json: str): | |
| """ | |
| Updates the OKR detail display when Key Results are selected in the CheckboxGroup. | |
| raw_orchestration_results_json and all_krs_for_selection_list_json are expected | |
| to be JSON strings from state. | |
| """ | |
| if not self.agentic_modules_really_loaded: | |
| return gr.update(value="Moduli AI non caricati. Impossibile visualizzare i dettagli OKR.") | |
| # Handle case where selected_kr_unique_ids might be None or not a list | |
| if not isinstance(selected_kr_unique_ids, list): | |
| selected_kr_unique_ids = [] | |
| parsed_orchestration_results = None | |
| try: | |
| if raw_orchestration_results_json: # Check if the string is not empty | |
| parsed_orchestration_results = json.loads(raw_orchestration_results_json) | |
| except (json.JSONDecodeError, TypeError) as e: | |
| logging.error(f"Failed to parse raw_orchestration_results_json: {raw_orchestration_results_json}. Error: {e}") | |
| return gr.update(value="Errore: Dati interni corrotti (orchestration results).") | |
| if not parsed_orchestration_results: # This covers None or empty after parsing | |
| return gr.update(value="Nessun dato dalla pipeline AI (orchestration results).") | |
| parsed_krs_for_selection_list = [] | |
| try: | |
| if all_krs_for_selection_list_json: # Check if the string is not empty | |
| parsed_krs_for_selection_list = json.loads(all_krs_for_selection_list_json) | |
| except (json.JSONDecodeError, TypeError) as e: | |
| logging.error(f"Failed to parse all_krs_for_selection_list_json: {all_krs_for_selection_list_json}. Error: {e}") | |
| return gr.update(value="Errore: Dati interni corrotti (krs for selection).") | |
| # Ensure parsed_krs_for_selection_list is a list, even if JSON was 'null' or other non-list type | |
| if not isinstance(parsed_krs_for_selection_list, list): | |
| logging.warning(f"Parsed all_krs_for_selection_list is not a list: {type(parsed_krs_for_selection_list)}. Defaulting to empty list.") | |
| parsed_krs_for_selection_list = [] | |
| actionable_okrs_dict = parsed_orchestration_results.get("actionable_okrs_and_tasks") if isinstance(parsed_orchestration_results, dict) else None | |
| if not actionable_okrs_dict or not isinstance(actionable_okrs_dict.get("okrs"), list): | |
| return gr.update(value="Nessun OKR trovato nei risultati della pipeline (o dati in formato imprevisto).") | |
| okrs_list = actionable_okrs_dict["okrs"] | |
| if not okrs_list: | |
| return gr.update(value="Nessun OKR generato.") | |
| kr_id_to_indices = {} | |
| if isinstance(parsed_krs_for_selection_list, list): # Ensure it's a list before iterating | |
| for kr_info in parsed_krs_for_selection_list: | |
| if isinstance(kr_info, dict) and 'unique_kr_id' in kr_info and 'okr_index' in kr_info and 'kr_index' in kr_info: | |
| kr_id_to_indices[kr_info['unique_kr_id']] = (kr_info['okr_index'], kr_info['kr_index']) | |
| else: | |
| logging.warning(f"Skipping invalid kr_info item: {kr_info}") | |
| selected_krs_by_okr_idx = defaultdict(list) | |
| # selected_kr_unique_ids comes directly from CheckboxGroup, should be a list of strings/values | |
| if isinstance(selected_kr_unique_ids, list): | |
| for kr_unique_id in selected_kr_unique_ids: | |
| if kr_unique_id in kr_id_to_indices: | |
| okr_idx, kr_idx_in_okr = kr_id_to_indices[kr_unique_id] | |
| selected_krs_by_okr_idx[okr_idx].append(kr_idx_in_okr) | |
| output_md_parts = [] | |
| for okr_idx, okr_data in enumerate(okrs_list): | |
| accepted_indices_for_this_okr = selected_krs_by_okr_idx.get(okr_idx) | |
| if selected_kr_unique_ids: | |
| if accepted_indices_for_this_okr is not None: | |
| formatted_okr_md = format_single_okr_for_display( | |
| okr_data, | |
| accepted_kr_indices=accepted_indices_for_this_okr, | |
| okr_main_index=okr_idx | |
| ) | |
| output_md_parts.append(formatted_okr_md) | |
| else: | |
| formatted_okr_md = format_single_okr_for_display( | |
| okr_data, | |
| accepted_kr_indices=None, | |
| okr_main_index=okr_idx | |
| ) | |
| output_md_parts.append(formatted_okr_md) | |
| if not output_md_parts and selected_kr_unique_ids: | |
| final_md = "Nessun OKR corrisponde alla selezione corrente o i KR selezionati non hanno task dettagliati." | |
| elif not output_md_parts and not selected_kr_unique_ids: | |
| final_md = "Nessun OKR generato." | |
| else: | |
| final_md = "\n\n---\n\n".join(output_md_parts) | |
| return gr.update(value=final_md) | |
| def setup_event_handlers(self): | |
| """Sets up event handlers for the agentic OKRs tab.""" | |
| if not self.agentic_modules_really_loaded: | |
| logging.warning("Agentic modules not loaded. Skipping agentic event handler setup.") | |
| return | |
| if self.okrs_components.get("key_results_cbg"): | |
| self.okrs_components['key_results_cbg'].change( | |
| fn=self.update_okr_display_on_kr_selection, | |
| inputs=[ | |
| self.okrs_components['key_results_cbg'], | |
| self.orchestration_raw_results_st, | |
| self.key_results_for_selection_st | |
| ], | |
| outputs=[self.okrs_components['okr_detail_display_md']], | |
| api_name="update_okr_display_on_kr_selection" # Keep api_name for Gradio | |
| ) | |
| logging.info("Agentic OKR selection handler setup complete.") | |
| else: | |
| logging.warning("key_results_cbg component not found for agentic OKR handler setup.") |