Spaces:
Sleeping
Sleeping
| import logging | |
| import os | |
| import sys | |
| import traceback | |
| from datetime import datetime | |
| import streamlit as st | |
| from jira import JIRA | |
| from dotenv import load_dotenv | |
| from datetime import datetime, timedelta | |
| import pandas as pd | |
| import requests | |
| import json | |
| from difflib import SequenceMatcher | |
| import time | |
| # Configure logging based on environment | |
| try: | |
| # Try to create logs directory and file | |
| log_dir = "logs" | |
| if not os.path.exists(log_dir): | |
| os.makedirs(log_dir) | |
| log_file = os.path.join(log_dir, f"jira_debug_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log") | |
| # Configure root logger with file handler | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler(log_file) | |
| ] | |
| ) | |
| except (OSError, IOError): | |
| # If file logging fails (e.g., in Hugging Face Spaces), configure logging without file handler | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.NullHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger("jira_integration") | |
| logger.info("Jira integration module loaded") | |
| # Load environment variables | |
| load_dotenv() | |
| # Get API keys and configuration with default values for development | |
| JIRA_SERVER = os.getenv("JIRA_SERVER") | |
| # Validate required environment variables | |
| if not JIRA_SERVER: | |
| st.error("JIRA_SERVER not found in environment variables. Please check your .env file.") | |
| def init_jira_session(): | |
| """Initialize Jira session state variables""" | |
| if 'jira_client' not in st.session_state: | |
| st.session_state.jira_client = None | |
| if 'projects' not in st.session_state: | |
| st.session_state.projects = None | |
| def get_projects(): | |
| """Fetch all accessible projects""" | |
| if not st.session_state.jira_client: | |
| return None | |
| try: | |
| projects = st.session_state.jira_client.projects() | |
| # Sort projects by key | |
| return sorted(projects, key=lambda x: x.key) | |
| except Exception as e: | |
| st.error(f"Error fetching projects: {str(e)}") | |
| return None | |
| def get_board_configuration(board_id): | |
| """Fetch board configuration including estimation field""" | |
| if not st.session_state.jira_client: | |
| return None | |
| try: | |
| url = f"{JIRA_SERVER}/rest/agile/1.0/board/{board_id}/configuration" | |
| response = st.session_state.jira_client._session.get(url) | |
| if response.status_code == 200: | |
| config = response.json() | |
| return config | |
| return None | |
| except Exception as e: | |
| st.error(f"Error fetching board configuration: {str(e)}") | |
| return None | |
| def get_boards(project_key): | |
| """Fetch all boards for a project""" | |
| if not st.session_state.jira_client: | |
| return None | |
| try: | |
| boards = st.session_state.jira_client.boards(projectKeyOrID=project_key) | |
| board_details = [] | |
| for board in boards: | |
| config = get_board_configuration(board.id) | |
| board_type = config.get('type', 'Unknown') if config else 'Unknown' | |
| estimation_field = None | |
| if config and 'estimation' in config: | |
| estimation_field = config['estimation'].get('field', {}).get('fieldId') | |
| board_details.append({ | |
| 'id': board.id, | |
| 'name': board.name, | |
| 'type': board_type, | |
| 'estimation_field': estimation_field | |
| }) | |
| return board_details | |
| except Exception as e: | |
| st.error(f"Error fetching boards: {str(e)}") | |
| return None | |
| def get_current_sprint(board_id): | |
| """Fetch the current sprint for a board""" | |
| if not st.session_state.jira_client: | |
| return None | |
| try: | |
| # Get all active and future sprints | |
| sprints = st.session_state.jira_client.sprints(board_id, state='active,future') | |
| if sprints: | |
| # Look for sprints starting with 'RS' | |
| rs_sprints = [sprint for sprint in sprints if sprint.name.startswith('RS')] | |
| if rs_sprints: | |
| # Sort sprints by name to get the latest one | |
| latest_sprint = sorted(rs_sprints, key=lambda x: x.name, reverse=True)[0] | |
| return latest_sprint | |
| else: | |
| st.warning("No RS sprints found. Available sprints: " + ", ".join([s.name for s in sprints])) | |
| return None | |
| except Exception as e: | |
| if "board does not support sprints" in str(e).lower(): | |
| return None | |
| st.error(f"Error fetching current sprint: {str(e)}") | |
| return None | |
| def get_board_issues(board_id, estimation_field=None): | |
| """Fetch all issues on the board using Agile REST API""" | |
| if not st.session_state.jira_client: | |
| return None | |
| try: | |
| url = f"{JIRA_SERVER}/rest/agile/1.0/board/{board_id}/issue" | |
| fields = ['summary', 'status', 'created', 'description', 'issuetype', 'assignee'] | |
| if estimation_field: | |
| fields.append(estimation_field) | |
| params = { | |
| 'maxResults': 200, | |
| 'fields': fields, | |
| 'jql': f'assignee = currentUser()' | |
| } | |
| response = st.session_state.jira_client._session.get(url, params=params) | |
| if response.status_code != 200: | |
| st.error(f"Error fetching board issues: {response.text}") | |
| return None | |
| data = response.json() | |
| issues = [] | |
| for issue_data in data['issues']: | |
| issue = st.session_state.jira_client.issue(issue_data['key']) | |
| issues.append(issue) | |
| return issues | |
| except Exception as e: | |
| st.error(f"Error fetching board issues: {str(e)}") | |
| return None | |
| def get_sprint_issues(board_id, sprint_id, estimation_field=None): | |
| """Fetch all issues in the current sprint using Agile REST API""" | |
| if not st.session_state.jira_client: | |
| return None | |
| try: | |
| # Cache key for sprint issues | |
| cache_key = f"sprint_issues_{sprint_id}" | |
| if cache_key in st.session_state and (datetime.now() - st.session_state.get('last_refresh', datetime.min)).total_seconds() < 60: | |
| return st.session_state[cache_key] | |
| # Use the Agile REST API endpoint to get issues from the sprint | |
| url = f"{JIRA_SERVER}/rest/agile/1.0/board/{board_id}/sprint/{sprint_id}/issue" | |
| params = { | |
| 'maxResults': 200, | |
| 'fields': [ | |
| 'summary', | |
| 'status', | |
| 'created', | |
| 'description', | |
| 'issuetype', | |
| 'assignee' | |
| ], | |
| 'jql': 'assignee = currentUser()' # Filter for current user's issues | |
| } | |
| # Add estimation field if provided | |
| if estimation_field: | |
| params['fields'].append(estimation_field) | |
| # Make a single API call with all required fields | |
| response = st.session_state.jira_client._session.get(url, params=params) | |
| if response.status_code != 200: | |
| st.error(f"Error fetching sprint issues: {response.text}") | |
| return None | |
| # Process all issues in one go | |
| data = response.json() | |
| issues = [st.session_state.jira_client.issue(issue['key']) for issue in data['issues']] | |
| # Cache the results | |
| st.session_state[cache_key] = issues | |
| st.session_state['last_refresh'] = datetime.now() | |
| return issues | |
| except Exception as e: | |
| st.error(f"Error fetching sprint issues: {str(e)}") | |
| return None | |
| def calculate_points(issues, estimation_field): | |
| """Calculate story points from issues""" | |
| if not estimation_field: | |
| return [], 0, 0, 0 | |
| try: | |
| # Process all issues at once | |
| field_id = estimation_field.replace('customfield_', '') | |
| issues_data = [] | |
| total_points = completed_points = in_progress_points = 0 | |
| # Create status mappings for faster lookup | |
| done_statuses = {'done', 'completed', 'closed'} | |
| progress_statuses = {'in progress', 'development', 'in development'} | |
| for issue in issues: | |
| try: | |
| # Get points value efficiently | |
| points = getattr(issue.fields, field_id, None) or getattr(issue.fields, estimation_field, 0) | |
| points = float(points) if points is not None else 0 | |
| # Get status efficiently | |
| status_name = issue.fields.status.name.lower() | |
| # Update points | |
| total_points += points | |
| if status_name in done_statuses: | |
| completed_points += points | |
| elif status_name in progress_statuses: | |
| in_progress_points += points | |
| # Build issue data | |
| issues_data.append({ | |
| "Key": issue.key, | |
| "Type": issue.fields.issuetype.name, | |
| "Summary": issue.fields.summary, | |
| "Status": issue.fields.status.name, | |
| "Story Points": points, | |
| "Assignee": issue.fields.assignee.displayName if issue.fields.assignee else "Unassigned" | |
| }) | |
| except Exception as e: | |
| st.error(f"Error processing points for issue {issue.key}: {str(e)}") | |
| continue | |
| return issues_data, total_points, completed_points, in_progress_points | |
| except Exception as e: | |
| st.error(f"Error calculating points: {str(e)}") | |
| return [], 0, 0, 0 | |
| def create_maintenance_task(project_key, summary, description, issue_type='Task'): | |
| """Create a task in Jira""" | |
| if not st.session_state.jira_client: | |
| st.error("Not authenticated with Jira. Please log in first.") | |
| return None | |
| try: | |
| issue_dict = { | |
| 'project': {'key': project_key}, | |
| 'summary': summary, | |
| 'description': description, | |
| 'issuetype': {'name': issue_type} | |
| } | |
| new_issue = st.session_state.jira_client.create_issue(fields=issue_dict) | |
| return new_issue | |
| except Exception as e: | |
| st.error(f"Error creating task: {str(e)}") | |
| return None | |
| def render_jira_login(): | |
| """Render the Jira login form and handle authentication""" | |
| # If already authenticated, just return True | |
| if 'jira_client' in st.session_state and st.session_state.jira_client: | |
| st.success("Connected to Jira") | |
| return True | |
| # Initialize session state for login form and attempts tracking | |
| if 'jira_username' not in st.session_state: | |
| st.session_state.jira_username = "" | |
| if 'jira_password' not in st.session_state: | |
| st.session_state.jira_password = "" | |
| if 'login_blocked_until' not in st.session_state: | |
| st.session_state.login_blocked_until = None | |
| # Check if login is temporarily blocked | |
| if st.session_state.login_blocked_until: | |
| if datetime.now() < st.session_state.login_blocked_until: | |
| wait_time = (st.session_state.login_blocked_until - datetime.now()).seconds | |
| st.error(f"Login temporarily blocked due to too many failed attempts. Please wait {wait_time} seconds before trying again.") | |
| st.info("If you need immediate access, please try logging in directly to Jira in your browser first, complete the CAPTCHA there, then return here.") | |
| return False | |
| else: | |
| st.session_state.login_blocked_until = None | |
| # Create login form | |
| with st.form(key="jira_login_form"): | |
| username = st.text_input("Jira Username", value=st.session_state.jira_username, key="username_input") | |
| password = st.text_input("Password", value=st.session_state.jira_password, type="password", key="password_input") | |
| submit_button = st.form_submit_button(label="Login") | |
| if submit_button: | |
| # Store credentials in session state | |
| st.session_state.jira_username = username | |
| st.session_state.jira_password = password | |
| # Try to authenticate | |
| try: | |
| jira_client = JIRA(server=JIRA_SERVER, basic_auth=(username, password)) | |
| st.session_state.jira_client = jira_client | |
| # Get projects | |
| projects = get_projects() | |
| if projects: | |
| st.session_state.projects = projects | |
| st.success("Connected to Jira") | |
| return True | |
| else: | |
| st.error("Failed to fetch projects") | |
| return False | |
| except Exception as e: | |
| error_message = str(e).lower() | |
| if "captcha_challenge" in error_message: | |
| # Set a 5-minute block on login attempts | |
| st.session_state.login_blocked_until = datetime.now() + timedelta(minutes=5) | |
| st.error("Too many failed login attempts. Please try one of the following:") | |
| st.info(""" | |
| 1. Wait 5 minutes before trying again | |
| 2. Log in to Jira in your browser first, complete the CAPTCHA there, then return here | |
| 3. Clear your browser cookies and try again | |
| """) | |
| else: | |
| st.error(f"Authentication failed: {str(e)}") | |
| return False | |
| return False | |
| def get_cached_metadata(project_key): | |
| """Get cached metadata or fetch new if cache is expired or doesn't exist""" | |
| # Check if metadata cache exists in session state | |
| if 'metadata_cache' not in st.session_state: | |
| st.session_state.metadata_cache = {} | |
| if 'metadata_cache_timestamp' not in st.session_state: | |
| st.session_state.metadata_cache_timestamp = {} | |
| current_time = datetime.now() | |
| cache_expiry = timedelta(minutes=30) # Cache expires after 30 minutes | |
| # Check if we have valid cached metadata | |
| if (project_key in st.session_state.metadata_cache and | |
| project_key in st.session_state.metadata_cache_timestamp and | |
| current_time - st.session_state.metadata_cache_timestamp[project_key] < cache_expiry): | |
| logger.info(f"Using cached metadata for project {project_key}") | |
| return st.session_state.metadata_cache[project_key] | |
| # If no valid cache, fetch new metadata | |
| logger.info(f"Fetching fresh metadata for project {project_key}") | |
| metadata = get_project_metadata_fresh(project_key) | |
| if metadata: | |
| # Update cache | |
| st.session_state.metadata_cache[project_key] = metadata | |
| st.session_state.metadata_cache_timestamp[project_key] = current_time | |
| logger.info(f"Updated metadata cache for project {project_key}") | |
| return metadata | |
| def get_project_metadata_fresh(project_key): | |
| """Fetch fresh metadata from Jira without using cache""" | |
| logger.info(f"=== Getting fresh metadata for project {project_key} ===") | |
| if not st.session_state.jira_client: | |
| logger.error("Not authenticated with Jira. Please log in first.") | |
| st.error("Not authenticated with Jira. Please log in first.") | |
| return None | |
| try: | |
| # Get project | |
| logger.info("Getting project...") | |
| project = st.session_state.jira_client.project(project_key) | |
| logger.info(f"Got project: {project.name}") | |
| logger.info("Getting createmeta with expanded fields...") | |
| # Get create metadata for the project with expanded field info | |
| metadata = st.session_state.jira_client.createmeta( | |
| projectKeys=project_key, | |
| expand='projects.issuetypes.fields', | |
| issuetypeNames='Story' # Specifically get Story type fields | |
| ) | |
| logger.info("Got createmeta response") | |
| if not metadata.get('projects'): | |
| logger.error(f"No metadata found for project {project_key}") | |
| st.error(f"No metadata found for project {project_key}") | |
| return None | |
| project_meta = metadata['projects'][0] | |
| issue_types = project_meta.get('issuetypes', []) | |
| # Log available issue types | |
| logger.info(f"Available issue types: {[t.get('name') for t in issue_types]}") | |
| # Try to get Story issue type first | |
| story_type = next((t for t in issue_types if t['name'] == 'Story'), None) | |
| if not story_type: | |
| logger.error("Story issue type not found in project") | |
| st.error("Story issue type not found in project") | |
| return None | |
| logger.info("Processing fields...") | |
| # Get required fields and all fields | |
| required_fields = {} | |
| all_fields = {} | |
| # Log all available fields before processing | |
| logger.info("Available fields in Story type:") | |
| for field_id, field in story_type['fields'].items(): | |
| field_name = field.get('name', 'Unknown') | |
| field_type = field.get('schema', {}).get('type', 'Unknown') | |
| logger.info(f"Field: {field_name} (ID: {field_id}, Type: {field_type})") | |
| # Store complete field information including schema and allowed values | |
| all_fields[field_id] = { | |
| 'name': field['name'], | |
| 'required': field.get('required', False), | |
| 'schema': field.get('schema', {}), | |
| 'allowedValues': field.get('allowedValues', []), | |
| 'hasDefaultValue': field.get('hasDefaultValue', False), | |
| 'defaultValue': field.get('defaultValue'), | |
| 'operations': field.get('operations', []), | |
| 'configuration': field.get('configuration', {}) | |
| } | |
| # If this is a cascading select field, log its structure | |
| if field.get('schema', {}).get('type') == 'option-with-child': | |
| logger.info(f"Found cascading select field: {field_name}") | |
| if 'allowedValues' in field: | |
| for parent in field['allowedValues']: | |
| parent_value = parent.get('value', 'Unknown') | |
| logger.info(f" Parent value: {parent_value}") | |
| if 'cascadingOptions' in parent: | |
| child_values = [child.get('value') for child in parent['cascadingOptions']] | |
| logger.info(f" Child values: {child_values}") | |
| # Store required fields separately | |
| if field.get('required', False): | |
| required_fields[field_id] = all_fields[field_id] | |
| logger.info(f"Required field: {field_name}") | |
| logger.info(f"Found {len(all_fields)} total fields, {len(required_fields)} required fields") | |
| metadata_result = { | |
| 'project_name': project.name, | |
| 'issue_type': 'Story', | |
| 'required_fields': required_fields, | |
| 'all_fields': all_fields | |
| } | |
| logger.info("Successfully processed project metadata") | |
| return metadata_result | |
| except Exception as e: | |
| logger.exception(f"Error getting project metadata: {str(e)}") | |
| st.error(f"Error getting project metadata: {str(e)}") | |
| st.error("Full error details:") | |
| st.error(str(e)) | |
| st.code(traceback.format_exc(), language="python") | |
| return None | |
| def get_project_metadata(project_key): | |
| """Get project metadata, using cache if available""" | |
| return get_cached_metadata(project_key) | |
| def generate_task_content(filtered_scenarios_df): | |
| """Generate task summary and description using template-based approach""" | |
| try: | |
| # Extract key information | |
| environment = filtered_scenarios_df['Environment'].iloc[0] | |
| functional_area = filtered_scenarios_df['Functional area'].iloc[0] | |
| scenario_count = len(filtered_scenarios_df) | |
| # Generate summary | |
| summary = f"Maintenance: {environment} - {functional_area}" | |
| # Generate description | |
| description = "Performing maintenance on the following scenarios failing :\n\n" | |
| # Add each scenario and its error, using enumerate to start from 1 | |
| for i, (_, row) in enumerate(filtered_scenarios_df.iterrows(), 1): | |
| description += f"{i}. {row['Scenario Name']}\n" | |
| description += f" Error: {row['Error Message']}\n\n" | |
| return summary, description | |
| except Exception as e: | |
| st.error(f"Error generating task content: {str(e)}") | |
| return None, None | |
| def get_regression_board(project_key): | |
| """Find the regression sprint board for the project""" | |
| boards = get_boards(project_key) | |
| if not boards: | |
| return None | |
| # Look specifically for the "Regression Sprints" board | |
| regression_board = next((b for b in boards if b['name'].lower() == 'regression sprints' and b['type'].lower() == 'scrum'), None) | |
| if not regression_board: | |
| st.error("Could not find the 'Regression Sprints' board. Available boards: " + | |
| ", ".join([f"{b['name']} ({b['type']})" for b in boards])) | |
| return regression_board | |
| def get_field_dependencies(): | |
| """Cache and return field dependencies and their allowed values""" | |
| if 'field_dependencies' not in st.session_state: | |
| try: | |
| # Get project metadata for RS project | |
| metadata = get_project_metadata("RS") | |
| if not metadata: | |
| return None | |
| # Initialize dependencies dictionary with correct field IDs | |
| dependencies = { | |
| 'Customer': { | |
| 'field_id': 'customfield_10427', | |
| 'values': [], | |
| 'dependencies': {} | |
| }, | |
| 'Environment': { | |
| 'field_id': 'customfield_14157', # Updated field ID | |
| 'values': [], | |
| 'dependencies': {} | |
| }, | |
| 'Functional Areas': { | |
| 'field_id': 'customfield_15303', # Updated field ID | |
| 'values': [], | |
| 'dependencies': {} | |
| } | |
| } | |
| # Get field values and their dependencies | |
| for field_name, field_info in dependencies.items(): | |
| field_id = field_info['field_id'] | |
| if field_id in metadata['all_fields']: | |
| field_data = metadata['all_fields'][field_id] | |
| if 'allowedValues' in field_data: | |
| # Store allowed values | |
| dependencies[field_name]['values'] = [ | |
| value.get('value', value.get('name', '')) | |
| for value in field_data['allowedValues'] | |
| if isinstance(value, dict) | |
| ] | |
| # Store dependencies (if any) | |
| if 'dependency' in field_data: | |
| dep_field = field_data['dependency'] | |
| dependencies[field_name]['dependencies'] = { | |
| 'field': dep_field['field']['name'], | |
| 'field_id': dep_field['field']['id'], | |
| 'values': dep_field.get('values', []) | |
| } | |
| # Cache the dependencies | |
| st.session_state.field_dependencies = dependencies | |
| return dependencies | |
| except Exception as e: | |
| st.error(f"Error fetching field dependencies: {str(e)}") | |
| return None | |
| return st.session_state.field_dependencies | |
| def get_dependent_field_value(field_name, parent_value=None): | |
| """Get the appropriate field value based on dependencies""" | |
| dependencies = get_field_dependencies() | |
| if not dependencies or field_name not in dependencies: | |
| return None | |
| field_info = dependencies[field_name] | |
| # If this field depends on another field | |
| if parent_value and field_info['dependencies']: | |
| dep_info = field_info['dependencies'] | |
| # Find values that match the parent value | |
| for value_mapping in dep_info.get('values', []): | |
| if value_mapping.get('parent') == parent_value: | |
| return value_mapping.get('value') | |
| # If no dependency or no match, return first available value | |
| return field_info['values'][0] if field_info['values'] else None | |
| def display_project_fields(): | |
| """Display available fields for issue creation""" | |
| project_key = "RS" # Using the fixed project key | |
| metadata = get_project_metadata(project_key) | |
| if metadata: | |
| st.subheader("Project Fields") | |
| # Display required fields | |
| st.write("### Required Fields") | |
| for field_id, field in metadata['required_fields'].items(): | |
| st.write(f"- {field['name']} ({field_id})") | |
| if field.get('allowedValues'): | |
| st.write(" Allowed values:") | |
| for value in field['allowedValues']: | |
| if isinstance(value, dict): | |
| # Handle cascading select fields | |
| if 'cascadingOptions' in value: | |
| parent_value = value.get('value', 'Unknown') | |
| st.write(f" - {parent_value}") | |
| st.write(" Child options:") | |
| for child in value['cascadingOptions']: | |
| st.write(f" - {child.get('value', 'Unknown')}") | |
| else: | |
| st.write(f" - {value.get('value', value.get('name', 'Unknown'))}") | |
| else: | |
| st.write(f" - {value}") | |
| # Display custom fields with dependencies | |
| st.write("### Custom Fields and Dependencies") | |
| # Customer field (customfield_10427) - Cascading Select | |
| st.write("\n#### Customer (customfield_10427)") | |
| cust_field = metadata['all_fields'].get('customfield_10427', {}) | |
| if cust_field.get('allowedValues'): | |
| st.write("Cascading options:") | |
| for value in cust_field['allowedValues']: | |
| if isinstance(value, dict): | |
| parent_value = value.get('value', 'Unknown') | |
| st.write(f"- {parent_value}") | |
| if 'cascadingOptions' in value: | |
| st.write(" Child options:") | |
| for child in value['cascadingOptions']: | |
| st.write(f" - {child.get('value', 'Unknown')}") | |
| # Functional Areas field (customfield_13100) - Cascading Select | |
| st.write("\n#### Functional Areas (customfield_13100)") | |
| func_field = metadata['all_fields'].get('customfield_13100', {}) | |
| if func_field.get('allowedValues'): | |
| st.write("Cascading options:") | |
| for value in func_field['allowedValues']: | |
| if isinstance(value, dict): | |
| parent_value = value.get('value', 'Unknown') | |
| st.write(f"- {parent_value}") | |
| if 'cascadingOptions' in value: | |
| st.write(" Child options:") | |
| for child in value['cascadingOptions']: | |
| st.write(f" - {child.get('value', 'Unknown')}") | |
| # Environment field (customfield_14157) | |
| st.write("\n#### Environment (customfield_14157)") | |
| env_field = metadata['all_fields'].get('customfield_14157', {}) | |
| if env_field.get('allowedValues'): | |
| st.write("Allowed values:") | |
| for value in env_field['allowedValues']: | |
| if isinstance(value, dict): | |
| st.write(f" - {value.get('value', value.get('name', 'Unknown'))}") | |
| # Display dependencies | |
| if any(field.get('dependency') for field in [env_field, func_field, cust_field]): | |
| st.write("\n### Field Dependencies") | |
| for field_name, field in [ | |
| ('Environment', env_field), | |
| ('Functional Areas', func_field), | |
| ('Customer', cust_field) | |
| ]: | |
| if field.get('dependency'): | |
| st.write(f"\n{field_name} depends on:") | |
| dep = field['dependency'] | |
| st.write(f" Field: {dep['field']['name']} ({dep['field']['id']})") | |
| if dep.get('values'): | |
| st.write(" Value mappings:") | |
| for mapping in dep['values']: | |
| parent_value = mapping.get('parent', 'Unknown') | |
| child_value = mapping.get('value', 'Unknown') | |
| st.write(f" - When parent is '{parent_value}' → '{child_value}'") | |
| # Display other custom fields | |
| st.write("\n### Other Custom Fields") | |
| excluded_fields = ['customfield_14157', 'customfield_13100', 'customfield_10427'] | |
| custom_fields = {k: v for k, v in metadata['all_fields'].items() | |
| if k.startswith('customfield_') and k not in excluded_fields} | |
| for field_id, field in custom_fields.items(): | |
| st.write(f"- {field['name']} ({field_id})") | |
| if field.get('allowedValues'): | |
| st.write(" Allowed values:") | |
| for value in field.get('allowedValues', []): | |
| if isinstance(value, dict): | |
| if 'cascadingOptions' in value: | |
| parent_value = value.get('value', 'Unknown') | |
| st.write(f" - {parent_value}") | |
| st.write(" Child options:") | |
| for child in value['cascadingOptions']: | |
| st.write(f" - {child.get('value', 'Unknown')}") | |
| else: | |
| st.write(f" - {value.get('value', value.get('name', 'Unknown'))}") | |
| else: | |
| st.write(f" - {value}") | |
| def get_closest_match(target, choices, threshold=60): | |
| """ | |
| Find the closest matching string from choices using fuzzy matching. | |
| Returns the best match if similarity is above threshold, otherwise None. | |
| """ | |
| if not choices: | |
| return None | |
| try: | |
| def similarity(a, b): | |
| # Normalize strings for comparison | |
| a = a.lower().replace('-', ' ').replace(' ', ' ').strip() | |
| b = b.lower().replace('-', ' ').replace(' ', ' ').strip() | |
| return SequenceMatcher(None, a, b).ratio() * 100 | |
| # Calculate similarities | |
| similarities = [(choice, similarity(target, choice)) for choice in choices] | |
| # Sort by similarity score | |
| best_match = max(similarities, key=lambda x: x[1]) | |
| if best_match[1] >= threshold: | |
| return best_match[0] | |
| return None | |
| except Exception as e: | |
| st.error(f"Error in fuzzy matching: {str(e)}") | |
| return None | |
| def get_functional_area_values(metadata): | |
| """Extract all available functional area values from metadata""" | |
| logger.info("=== Starting get_functional_area_values ===") | |
| if not metadata: | |
| logger.error("No metadata provided") | |
| return [] | |
| if 'all_fields' not in metadata: | |
| logger.error("No 'all_fields' in metadata") | |
| logger.debug(f"Available metadata keys: {list(metadata.keys())}") | |
| return [] | |
| # Log all available field IDs for debugging | |
| logger.info("Available fields:") | |
| for field_id, field in metadata['all_fields'].items(): | |
| field_name = field.get('name', 'Unknown') | |
| field_type = field.get('schema', {}).get('type', 'Unknown') | |
| logger.info(f" {field_name} (ID: {field_id}, Type: {field_type})") | |
| # List of possible field IDs for functional areas | |
| functional_area_field_ids = [ | |
| 'customfield_15303', # New field ID | |
| 'customfield_13100', # Old field ID | |
| 'customfield_13101' # Another possible variation | |
| ] | |
| # Try to find the functional area field by name or ID | |
| func_field = None | |
| for field_id, field in metadata['all_fields'].items(): | |
| field_name = field.get('name', '').lower() | |
| if field_id in functional_area_field_ids or 'functional area' in field_name: | |
| func_field = field | |
| logger.info(f"Found functional area field: {field.get('name')} (ID: {field_id})") | |
| break | |
| if not func_field: | |
| logger.error("Could not find functional area field in metadata") | |
| logger.info("Available field names:") | |
| for field_id, field in metadata['all_fields'].items(): | |
| logger.info(f" {field.get('name', 'Unknown')} ({field_id})") | |
| return [] | |
| # Check field type | |
| field_type = func_field.get('schema', {}).get('type') | |
| logger.info(f"Functional area field type: {field_type}") | |
| allowed_values = [] | |
| if 'allowedValues' in func_field: | |
| logger.info("Processing allowed values...") | |
| for parent in func_field['allowedValues']: | |
| if isinstance(parent, dict): | |
| parent_value = parent.get('value', 'Unknown') | |
| logger.info(f"Processing parent value: {parent_value}") | |
| if 'cascadingOptions' in parent: | |
| for child in parent['cascadingOptions']: | |
| if isinstance(child, dict) and 'value' in child: | |
| allowed_values.append(child['value']) | |
| logger.debug(f"Added child value: {child['value']}") | |
| elif 'value' in parent: | |
| allowed_values.append(parent['value']) | |
| logger.debug(f"Added value: {parent['value']}") | |
| logger.info(f"Found {len(allowed_values)} allowed values") | |
| if allowed_values: | |
| logger.info(f"Sample of allowed values: {allowed_values[:5]}") | |
| else: | |
| logger.warning("No allowed values found in the field") | |
| return allowed_values | |
| def calculate_story_points(scenario_count): | |
| """Calculate story points based on number of scenarios""" | |
| if scenario_count <= 3: | |
| return 1 | |
| elif scenario_count <= 5: | |
| return 2 | |
| elif scenario_count <= 9: | |
| return 3 | |
| elif scenario_count <= 15: | |
| return 5 | |
| else: | |
| return 8 | |
| def map_functional_area(functional_area, metadata): | |
| """Map a functional area to its closest Jira allowed parent and child values using structured mapping.""" | |
| if not metadata or not functional_area: | |
| logger.error("No metadata or functional area provided") | |
| raise ValueError("Metadata and functional area are required") | |
| # Get the functional area field from metadata | |
| func_field = metadata['all_fields'].get('customfield_13100', {}) | |
| if not func_field or 'allowedValues' not in func_field: | |
| logger.error("Could not find functional area field in metadata") | |
| raise ValueError("Functional area field not found in metadata") | |
| # Build a set of allowed child values for faster lookup | |
| allowed_values = {} | |
| for parent in func_field['allowedValues']: | |
| if isinstance(parent, dict): | |
| parent_value = parent.get('value') | |
| if parent_value and 'children' in parent: | |
| for child in parent['children']: | |
| if isinstance(child, dict) and 'value' in child: | |
| allowed_values[child['value']] = parent_value | |
| logger.info(f"Input functional area: {functional_area}") | |
| # Split the functional area into parts | |
| parts = [p.strip() for p in functional_area.split(' - ')] | |
| logger.info(f"Split into parts: {parts}") | |
| # Try different combinations of parts joined with '-' | |
| for i in range(len(parts)): | |
| for j in range(i + 1, len(parts) + 1): | |
| # Try joining parts with '-' | |
| test_value = '-'.join(parts[i:j]) | |
| # Also try without spaces | |
| test_value_no_spaces = test_value.replace(' ', '') | |
| logger.info(f"Trying combination: {test_value}") | |
| # Check both versions (with and without spaces) | |
| if test_value in allowed_values: | |
| logger.info(f"Found exact match: {test_value}") | |
| return allowed_values[test_value], test_value | |
| elif test_value_no_spaces in allowed_values: | |
| logger.info(f"Found match without spaces: {test_value_no_spaces}") | |
| return allowed_values[test_value_no_spaces], test_value_no_spaces | |
| # Try category-specific matches | |
| categories = ['Services', 'FIN', 'WARPSPEED'] | |
| for category in categories: | |
| category_value = f"{category}-{test_value}" | |
| category_value_no_spaces = category_value.replace(' ', '') | |
| if category_value in allowed_values: | |
| logger.info(f"Found category match: {category_value}") | |
| return allowed_values[category_value], category_value | |
| elif category_value_no_spaces in allowed_values: | |
| logger.info(f"Found category match without spaces: {category_value_no_spaces}") | |
| return allowed_values[category_value_no_spaces], category_value_no_spaces | |
| # If no match found, try to find a suitable default based on the first part | |
| first_part = parts[0].upper() | |
| if 'SERVICE' in first_part or 'SERVICES' in first_part: | |
| logger.info("No exact match found, defaulting to Services-Platform") | |
| return "R&I", "Services-Platform" | |
| elif 'FIN' in first_part: | |
| logger.info("No exact match found, defaulting to FIN-Parameters") | |
| return "R&I", "FIN-Parameters" | |
| elif 'WARPSPEED' in first_part: | |
| logger.info("No exact match found, defaulting to WARPSPEED-Parameters") | |
| return "R&I", "WARPSPEED-Parameters" | |
| # Final fallback to Data Exchange | |
| logger.warning(f"No suitable match found for '{functional_area}', defaulting to Data Exchange") | |
| return "R&I", "Data Exchange" | |
| def get_customer_field_values(metadata): | |
| """Extract all available customer field values and their child options from metadata""" | |
| if not metadata or 'all_fields' not in metadata: | |
| return {} | |
| customer_field = metadata['all_fields'].get('customfield_10427', {}) | |
| customer_values = {} | |
| if 'allowedValues' in customer_field: | |
| for parent in customer_field['allowedValues']: | |
| if isinstance(parent, dict): | |
| parent_value = parent.get('value') | |
| if parent_value: | |
| child_values = [] | |
| if 'cascadingOptions' in parent: | |
| child_values = [child.get('value') for child in parent['cascadingOptions'] if child.get('value')] | |
| customer_values[parent_value] = child_values | |
| return customer_values | |
| def map_customer_value(environment_value, customer_values): | |
| """Map environment value to appropriate customer field values""" | |
| if not environment_value or not customer_values: | |
| return "MIP Research and Innovation", "R&I General" | |
| # Clean up environment value | |
| env_value = environment_value.strip() | |
| # Special case handling for specific environments | |
| if any(env in env_value.lower() for env in ['legalwise', 'scorpion', 'lifewise', 'talksure']): | |
| parent_value = "ILR" | |
| child_value = env_value # Use the original environment value as child | |
| logger.info(f"Mapped {env_value} to ILR parent with child {child_value}") | |
| return parent_value, child_value | |
| # Handle RI environments | |
| if env_value.startswith('RI'): | |
| parent_value = "MIP Research and Innovation" | |
| # Remove 'RI' prefix and clean up | |
| child_value = env_value[2:].strip() | |
| if child_value: | |
| child_value = f"R&I {child_value}" | |
| else: | |
| child_value = "R&I General" | |
| logger.info(f"Mapped RI environment {env_value} to {parent_value} parent with child {child_value}") | |
| return parent_value, child_value | |
| # Default case - try to find matching values | |
| for parent, children in customer_values.items(): | |
| if parent == "MIP Research and Innovation": # Default parent | |
| # Look for exact match in child values | |
| if env_value in children: | |
| return parent, env_value | |
| # Look for partial matches | |
| for child in children: | |
| if env_value in child or child in env_value: | |
| return parent, child | |
| # If no match found, return defaults | |
| logger.warning(f"No specific mapping found for {env_value}, using defaults") | |
| return "MIP Research and Innovation", "R&I General" | |
| def create_regression_task(project_key, summary, description, environment, filtered_scenarios_df): | |
| logger.debug(f"Entering create_regression_task with project_key={project_key}, summary={summary}, environment={environment}, DF_shape={filtered_scenarios_df.shape}") | |
| logger.info("=== Starting create_regression_task function ===") | |
| logger.info(f"Project: {project_key}, Summary: {summary}, Environment: {environment}") | |
| logger.info(f"Filtered DF shape: {filtered_scenarios_df.shape if filtered_scenarios_df is not None else 'None'}") | |
| try: | |
| # Get metadata first to access field values | |
| metadata = get_project_metadata(project_key) | |
| if not metadata: | |
| error_msg = "Could not get project metadata" | |
| logger.error(error_msg) | |
| st.error(error_msg) | |
| return None | |
| # Get customer field values and map environment | |
| customer_values = get_customer_field_values(metadata) | |
| parent_value, child_value = map_customer_value(environment, customer_values) | |
| logger.info(f"Mapped customer values - Parent: {parent_value}, Child: {child_value}") | |
| # Get Jira client | |
| if "jira_client" not in st.session_state: | |
| error_msg = "No Jira client available. Please connect to Jira first." | |
| logger.error(error_msg) | |
| return None | |
| jira_client = st.session_state.jira_client | |
| logger.info("Got Jira client from session state") | |
| # Get active sprint | |
| active_sprint = get_current_sprint(get_regression_board(project_key)['id']) | |
| if not active_sprint: | |
| error_msg = "No active sprint found" | |
| logger.error(error_msg) | |
| return None | |
| logger.info(f"Found active sprint: {active_sprint.name} (ID: {active_sprint.id})") | |
| # Extract functional area from filtered scenarios | |
| functional_areas = [] | |
| try: | |
| if "Functional area" in filtered_scenarios_df.columns: | |
| functional_areas = filtered_scenarios_df["Functional area"].unique().tolist() | |
| logger.info(f"Extracted functional areas: {functional_areas}") | |
| except Exception as e: | |
| logger.exception(f"Error extracting functional areas: {str(e)}") | |
| st.error(f"Error extracting functional areas: {str(e)}") | |
| return None | |
| # Calculate story points based on number of scenarios | |
| story_points = calculate_story_points(len(filtered_scenarios_df)) | |
| logger.info(f"Calculated story points: {story_points}") | |
| # Map functional area using metadata | |
| functional_area_parent, functional_area_child = map_functional_area( | |
| functional_areas[0] if functional_areas else "Data Exchange", | |
| metadata | |
| ) | |
| logger.info(f"Mapped functional area to parent: {functional_area_parent}, child: {functional_area_child}") | |
| # Prepare issue dictionary with all required fields | |
| issue_dict = { | |
| "project": {"key": project_key}, | |
| "summary": summary, | |
| "description": description, | |
| "issuetype": {"name": "Story"}, | |
| "components": [{"name": "Maintenance (Regression)"}], | |
| "customfield_10427": { | |
| "value": parent_value, | |
| "child": { | |
| "value": child_value | |
| } | |
| }, | |
| "customfield_12730": {"value": "Non-Business Critical"}, # Regression Type field | |
| "customfield_13430": {"value": str(len(filtered_scenarios_df))}, # Number of Scenarios | |
| "customfield_13100": { | |
| "value": functional_area_parent, | |
| "child": { | |
| "value": functional_area_child | |
| } | |
| }, | |
| "assignee": {"name": st.session_state.jira_username}, | |
| "customfield_10002": story_points # Story Points field | |
| } | |
| # Log the complete issue dictionary | |
| logger.info(f"Issue dictionary prepared: {issue_dict}") | |
| # Create the issue | |
| logger.info("Attempting to create issue in Jira...") | |
| try: | |
| # Create the issue with all fields | |
| new_issue = jira_client.create_issue(fields=issue_dict) | |
| logger.info(f"Issue created successfully: {new_issue.key}") | |
| # Add issue to sprint | |
| try: | |
| logger.info(f"Attempting to add issue {new_issue.key} to sprint {active_sprint.id}...") | |
| jira_client.add_issues_to_sprint(active_sprint.id, [new_issue.key]) | |
| logger.info(f"Added issue {new_issue.key} to sprint {active_sprint.name}") | |
| except Exception as sprint_error: | |
| logger.exception(f"Failed to add issue to sprint: {str(sprint_error)}") | |
| st.warning(f"⚠️ Could not add task to sprint. Error: {str(sprint_error)}") | |
| # Display success message | |
| st.success(f"✅ Task created successfully: {new_issue.key}") | |
| return new_issue | |
| except Exception as create_error: | |
| error_message = str(create_error) | |
| logger.exception(f"Failed to create issue: {error_message}") | |
| # Try to extract the response content if it's a JIRA Error | |
| try: | |
| if hasattr(create_error, 'response'): | |
| status_code = getattr(create_error.response, 'status_code', 'N/A') | |
| logger.error(f"Response status code: {status_code}") | |
| if hasattr(create_error.response, 'text'): | |
| response_text = create_error.response.text | |
| logger.error(f"Response text: {response_text}") | |
| # Display the error to the user | |
| st.error(f"❌ Error creating task in Jira (Status: {status_code}):") | |
| st.error(response_text) | |
| except Exception as extract_error: | |
| logger.exception(f"Error extracting response details: {str(extract_error)}") | |
| return None | |
| except Exception as e: | |
| error_message = f"❌ Unexpected error in create_regression_task: {str(e)}" | |
| logger.exception(error_message) | |
| st.error(error_message) | |
| logger.error(f"Traceback: {''.join(traceback.format_exception(type(e), e, e.__traceback__))}") | |
| return None | |
| def create_test_data(): | |
| """Create test data for development/testing that matches the filtered scenarios from multiple.py""" | |
| test_data = { | |
| 'Environment': ['RI2008'] * 7, # Same environment for all scenarios | |
| 'Functional area': ['Data Exchange - Enquiries - Reports'] * 7, | |
| 'Scenario Name': [ | |
| 'Add Missions Error Handling - Existing Code', | |
| 'Add Missions Error Handling - Incorrect Max Iterations', | |
| 'Add Missions Error Handling - No Badges', | |
| 'Add Missions Success - Individual', | |
| 'Add Missions Success - Team', | |
| 'Add Missions Success - Individual Iteration', | |
| 'Add Missions Success - Team Max Iterations' | |
| ], | |
| 'Error Message': [ | |
| 'AssertionError [ERR_ASSERTION]: Error handling for existing code failed', | |
| 'AssertionError [ERR_ASSERTION]: Error handling for max iterations failed', | |
| 'AssertionError [ERR_ASSERTION]: Error handling for missing badges failed', | |
| 'AssertionError [ERR_ASSERTION]: Link validation failed', | |
| 'AssertionError [ERR_ASSERTION]: Link validation failed', | |
| 'AssertionError [ERR_ASSERTION]: Link validation failed', | |
| 'AssertionError [ERR_ASSERTION]: Link validation failed' | |
| ], | |
| 'Status': ['FAILED'] * 7, | |
| 'Time spent(m:s)': ['02:30'] * 7, # Example time spent | |
| 'Start datetime': [datetime.now()] * 7 # Current time as example | |
| } | |
| # Create DataFrame | |
| df = pd.DataFrame(test_data) | |
| # Add metadata that will be used for Jira task creation | |
| df.attrs['metadata'] = { | |
| 'Customer': 'MIP Research and Innovation - R&I 2008', | |
| 'Sprint': 'RS Sprint 195', | |
| 'Story Points': 5, | |
| 'Regression Type': 'Non-Business Critical', | |
| 'Component': 'Maintenance (Regression)', | |
| 'Priority': 'Lowest', | |
| 'Type': 'Story', | |
| 'Labels': 'None', | |
| 'Assignee': 'Daniel Akinsola', | |
| 'Reporter': 'Daniel Akinsola' | |
| } | |
| return df | |
| def process_failures_button(filtered_scenarios_df, environment=None): | |
| """Process failures and create Jira task""" | |
| # Use RS project key since we can see it's a Regression Sprint board | |
| project_key = "RS" | |
| project_name = "RS - Regression" | |
| # Get environment from DataFrame if not provided | |
| if environment is None and 'Environment' in filtered_scenarios_df.columns: | |
| environment = filtered_scenarios_df['Environment'].iloc[0] | |
| # Get unique functional areas from the DataFrame | |
| functional_areas = filtered_scenarios_df['Functional area'].unique() | |
| functional_area = functional_areas[0] if len(functional_areas) > 0 else 'R&I' | |
| # Extract the main service category | |
| service_category = functional_area.split(' - ')[0] + '-' + functional_area.split(' - ')[1] | |
| service_category = service_category.replace(' ', '') | |
| # Format environment value | |
| env_number = environment[2:] if environment.startswith('RI') else environment | |
| env_value = env_number if env_number.startswith('R&I') else f"R&I {env_number}" | |
| # Get the current sprint from the regression board | |
| board = get_regression_board(project_key) | |
| sprint = None | |
| sprint_name = "No Active Sprint" | |
| if board: | |
| sprint = get_current_sprint(board['id']) | |
| if sprint: | |
| sprint_name = sprint.name | |
| st.write(f"Found active sprint: {sprint_name}") | |
| else: | |
| st.warning("No active sprint found") | |
| # Create metadata dictionary with all required fields | |
| metadata = { | |
| 'Project Key': project_key, | |
| 'Project': project_name, | |
| 'Issue Type': 'Story', | |
| 'Customer': 'MIP Research and Innovation', | |
| 'Environment': env_value, | |
| 'Functional Areas': service_category, | |
| 'Sprint': sprint_name, | |
| 'Story Points': calculate_story_points(len(filtered_scenarios_df)), | |
| 'Regression Type': 'Non-Business Critical', | |
| 'Number of Scenarios': len(filtered_scenarios_df) if len(filtered_scenarios_df) <= 50 else 50 | |
| } | |
| # Initialize session states if not exists | |
| if 'task_content' not in st.session_state: | |
| st.session_state.task_content = None | |
| if 'task_created' not in st.session_state: | |
| st.session_state.task_created = False | |
| if 'created_task' not in st.session_state: | |
| st.session_state.created_task = None | |
| if 'show_success' not in st.session_state: | |
| st.session_state.show_success = False | |
| if 'last_task_key' not in st.session_state: | |
| st.session_state.last_task_key = None | |
| if 'last_task_url' not in st.session_state: | |
| st.session_state.last_task_url = None | |
| # Store sprint information in session state for task creation | |
| if sprint: | |
| st.session_state.current_sprint = sprint | |
| # If we have a recently created task, show the success message first | |
| if st.session_state.show_success and st.session_state.last_task_key: | |
| st.success(f"✅ Task created successfully!") | |
| # Display task link in a more prominent way | |
| st.markdown( | |
| f""" | |
| <div style='padding: 10px; border-radius: 5px; border: 1px solid #90EE90; margin: 10px 0;'> | |
| <h3 style='margin: 0; color: #90EE90;'>Task Details</h3> | |
| <p style='margin: 10px 0;'>Task Key: {st.session_state.last_task_key}</p> | |
| <a href='{st.session_state.last_task_url}' target='_blank' | |
| style='background-color: #90EE90; color: black; padding: 5px 10px; | |
| border-radius: 3px; text-decoration: none; display: inline-block;'> | |
| View Task in Jira | |
| </a> | |
| </div> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |
| # Add a button to create another task | |
| if st.button("Create Another Task", key="create_another"): | |
| # Clear all task-related state | |
| st.session_state.task_content = None | |
| st.session_state.task_created = False | |
| st.session_state.created_task = None | |
| st.session_state.show_success = False | |
| st.session_state.last_task_key = None | |
| st.session_state.last_task_url = None | |
| st.rerun() | |
| return | |
| # Button to generate content | |
| if st.button("Generate Task Content"): | |
| with st.spinner("Generating task content..."): | |
| summary, description = generate_task_content(filtered_scenarios_df) | |
| if summary and description: | |
| st.session_state.task_content = { | |
| 'summary': summary, | |
| 'description': description, | |
| 'environment': environment, | |
| 'metadata': metadata | |
| } | |
| else: | |
| st.error("Failed to generate task content. Please try again.") | |
| return | |
| # Display content and create task button if content exists | |
| if st.session_state.task_content: | |
| with st.expander("Generated Task Content", expanded=True): | |
| # Summary section with styling | |
| st.markdown("### Summary") | |
| st.markdown(f""" | |
| <div style='background-color: #f0f2f6; padding: 10px; border-radius: 5px; border: 1px solid #e0e0e0; color: #0f1629;'> | |
| {st.session_state.task_content['summary']} | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Description section with styling | |
| st.markdown("### Description") | |
| st.markdown(f""" | |
| <div style='background-color: #f0f2f6; padding: 10px; border-radius: 5px; border: 1px solid #e0e0e0; color: #0f1629; white-space: pre-wrap;'> | |
| {st.session_state.task_content['description']} | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Get and display available functional area values | |
| display_functional_areas(st.session_state.task_content['metadata']) | |
| # Display metadata with actual field values | |
| st.markdown("### Fields to be Set") | |
| metadata = st.session_state.task_content['metadata'] | |
| metadata_html = f""" | |
| <div style='background-color: #f0f2f6; padding: 10px; border-radius: 5px; border: 1px solid #e0e0e0; color: #0f1629;'> | |
| <p><strong>Project:</strong> {metadata['Project']}</p> | |
| <p><strong>Issue Type:</strong> {metadata['Issue Type']}</p> | |
| <p><strong>Customer:</strong> {metadata['Customer']}</p> | |
| <p><strong>Environment:</strong> {metadata['Environment']}</p> | |
| <p><strong>Functional Areas:</strong> {metadata['Functional Areas']}</p> | |
| <p><strong>Sprint:</strong> {metadata['Sprint']}</p> | |
| <p><strong>Story Points:</strong> {metadata['Story Points']}</p> | |
| <p><strong>Regression Type:</strong> {metadata['Regression Type']}</p> | |
| <p><strong>Number of Scenarios:</strong> {metadata['Number of Scenarios']}</p> | |
| </div> | |
| """ | |
| st.markdown(metadata_html, unsafe_allow_html=True) | |
| # Add buttons in columns for better layout | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("🔄 Regenerate Content", key="regenerate"): | |
| st.session_state.task_content = None | |
| st.rerun() | |
| with col2: | |
| if st.button("📝 Create Jira Task", key="create"): | |
| task = create_regression_task( | |
| metadata['Project Key'], | |
| st.session_state.task_content['summary'], | |
| st.session_state.task_content['description'], | |
| st.session_state.task_content['environment'], | |
| filtered_scenarios_df | |
| ) | |
| if task: | |
| # Store task information in session state | |
| st.session_state.last_task_key = task.key | |
| st.session_state.last_task_url = f"{JIRA_SERVER}/browse/{task.key}" | |
| st.session_state.show_success = True | |
| # Clear the content | |
| st.session_state.task_content = None | |
| # Force refresh of sprint stats on next load | |
| st.session_state.force_sprint_refresh = True | |
| st.rerun() | |
| else: | |
| st.error("Failed to create task. Please try again.") | |
| def display_functional_areas(metadata): | |
| """Display functional areas and customer fields in a tree-like structure with styling""" | |
| if not metadata: | |
| st.error("No metadata available") | |
| return | |
| # If this is task metadata (not project metadata), get project metadata first | |
| if 'all_fields' not in metadata: | |
| project_metadata = get_project_metadata("RS") # RS is the fixed project key | |
| if not project_metadata: | |
| st.error("Could not fetch project metadata") | |
| return | |
| metadata = project_metadata | |
| # Display Functional Areas | |
| func_field = metadata['all_fields'].get('customfield_13100', {}) | |
| if func_field and 'allowedValues' in func_field: | |
| st.markdown("### Available Functional Areas") | |
| # Log the raw allowedValues for debugging | |
| logger.info("=== Raw Functional Area Values ===") | |
| for value in func_field['allowedValues']: | |
| logger.info(f"Raw value: {value}") | |
| # Create a dictionary to store parent-child relationships | |
| parent_child_map = {} | |
| # First pass: collect all parent-child relationships | |
| for value in func_field['allowedValues']: | |
| if isinstance(value, dict): | |
| parent_value = value.get('value', 'Unknown') | |
| if parent_value: | |
| child_values = [] | |
| logger.info(f"\nProcessing parent: {parent_value}") | |
| if 'cascadingOptions' in value: | |
| logger.info(f"Found cascading options for {parent_value}:") | |
| for child in value['cascadingOptions']: | |
| logger.info(f"Raw child value: {child}") | |
| if isinstance(child, dict) and child.get('value'): | |
| child_value = child.get('value') | |
| child_values.append(child_value) | |
| logger.info(f" - Added child: {child_value}") | |
| parent_child_map[parent_value] = sorted(child_values) if child_values else [] | |
| logger.info(f"Final children for {parent_value}: {parent_child_map[parent_value]}") | |
| # Second pass: display the relationships | |
| for parent_value in sorted(parent_child_map.keys()): | |
| child_values = parent_child_map[parent_value] | |
| # Create a styled box for each parent and its children | |
| st.markdown(f""" | |
| <div style='background-color: #f0f2f6; padding: 10px; border-radius: 5px; border: 1px solid #e0e0e0; color: #0f1629; margin-bottom: 10px;'> | |
| <strong>{parent_value}</strong> | |
| {"<ul style='margin-bottom: 0; margin-top: 5px;'>" if child_values else ""} | |
| """, unsafe_allow_html=True) | |
| # Display child values if they exist | |
| for child in child_values: | |
| st.markdown(f"<li>{child}</li>", unsafe_allow_html=True) | |
| if child_values: | |
| st.markdown("</ul>", unsafe_allow_html=True) | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| # Log the parent-child relationship for debugging | |
| logger.info(f"Displaying Parent: {parent_value}") | |
| if child_values: | |
| logger.info(f" With Children: {', '.join(child_values)}") | |
| else: | |
| logger.info(" No children found") | |
| else: | |
| st.warning("No functional area values found in metadata") | |
| logger.warning("No functional area values found in metadata") | |
| if func_field: | |
| logger.info("Available func_field keys: " + str(list(func_field.keys()))) | |
| # Display Customer Field | |
| cust_field = metadata['all_fields'].get('customfield_10427', {}) | |
| if cust_field and 'allowedValues' in cust_field: | |
| st.markdown("### Available Customer Values") | |
| # Log the raw allowedValues for debugging | |
| logger.info("=== Raw Customer Field Values ===") | |
| for value in cust_field['allowedValues']: | |
| logger.info(f"Raw value: {value}") | |
| # Create a dictionary to store parent-child relationships for customer field | |
| customer_parent_child_map = {} | |
| # First pass: collect all parent-child relationships | |
| for value in cust_field['allowedValues']: | |
| if isinstance(value, dict): | |
| parent_value = value.get('value', 'Unknown') | |
| if parent_value: | |
| child_values = [] | |
| logger.info(f"\nProcessing customer parent: {parent_value}") | |
| if 'cascadingOptions' in value: | |
| logger.info(f"Found customer cascading options for {parent_value}:") | |
| for child in value['cascadingOptions']: | |
| logger.info(f"Raw child value: {child}") | |
| if isinstance(child, dict) and child.get('value'): | |
| child_value = child.get('value') | |
| child_values.append(child_value) | |
| logger.info(f" - Added child: {child_value}") | |
| customer_parent_child_map[parent_value] = sorted(child_values) if child_values else [] | |
| logger.info(f"Final customer children for {parent_value}: {customer_parent_child_map[parent_value]}") | |
| # Second pass: display the relationships | |
| for parent_value in sorted(customer_parent_child_map.keys()): | |
| child_values = customer_parent_child_map[parent_value] | |
| # Create a styled box for each parent and its children | |
| st.markdown(f""" | |
| <div style='background-color: #f0f2f6; padding: 10px; border-radius: 5px; border: 1px solid #e0e0e0; color: #0f1629; margin-bottom: 10px;'> | |
| <strong>{parent_value}</strong> | |
| {"<ul style='margin-bottom: 0; margin-top: 5px;'>" if child_values else ""} | |
| """, unsafe_allow_html=True) | |
| # Display child values if they exist | |
| for child in child_values: | |
| st.markdown(f"<li>{child}</li>", unsafe_allow_html=True) | |
| if child_values: | |
| st.markdown("</ul>", unsafe_allow_html=True) | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| # Log the parent-child relationship for debugging | |
| logger.info(f"Displaying Customer Parent: {parent_value}") | |
| if child_values: | |
| logger.info(f" With Children: {', '.join(child_values)}") | |
| else: | |
| logger.info(" No children found") | |
| else: | |
| st.warning("No customer field values found in metadata") | |
| logger.warning("No customer field values found in metadata") | |
| if cust_field: | |
| logger.info("Available cust_field keys: " + str(list(cust_field.keys()))) | |
| def display_story_points_stats(force_refresh=False): | |
| """Display story points statistics from current sprint""" | |
| if not st.session_state.jira_client: | |
| return | |
| # Initialize sprint data cache | |
| if 'sprint_data' not in st.session_state: | |
| st.session_state.sprint_data = None | |
| if 'last_sprint_refresh' not in st.session_state: | |
| st.session_state.last_sprint_refresh = None | |
| current_time = datetime.now() | |
| cache_expiry = 300 # 5 minutes | |
| # Determine if a data refresh is needed | |
| refresh_needed = ( | |
| force_refresh | |
| or st.session_state.sprint_data is None | |
| or (st.session_state.last_sprint_refresh | |
| and (current_time - st.session_state.last_sprint_refresh).total_seconds() > cache_expiry) | |
| ) | |
| if refresh_needed: | |
| # Only show spinner when fetching new data | |
| with st.spinner("Fetching sprint data..."): | |
| board = get_regression_board("RS") | |
| if not board: | |
| return | |
| sprint = get_current_sprint(board['id']) | |
| if not sprint: | |
| return | |
| issues = get_sprint_issues(board['id'], sprint.id, board['estimation_field']) | |
| if not issues: | |
| return | |
| # Calculate story points | |
| _, total_points, completed_points, in_progress_points = calculate_points( | |
| issues, board['estimation_field'] | |
| ) | |
| # Cache results | |
| st.session_state.sprint_data = { | |
| 'sprint_name': sprint.name, | |
| 'total_points': total_points, | |
| 'completed_points': completed_points, | |
| 'in_progress_points': in_progress_points | |
| } | |
| st.session_state.last_sprint_refresh = current_time | |
| # Display cached sprint data | |
| if st.session_state.sprint_data: | |
| sprint_data = st.session_state.sprint_data | |
| cols = st.columns(4) | |
| with cols[0]: | |
| st.metric("Total", f"{sprint_data['total_points']:.1f}") | |
| with cols[1]: | |
| st.metric("Done", f"{sprint_data['completed_points']:.1f}") | |
| with cols[2]: | |
| st.metric("In Progress", f"{sprint_data['in_progress_points']:.1f}") | |
| with cols[3]: | |
| completion_rate = ( | |
| sprint_data['completed_points'] / sprint_data['total_points'] * 100 | |
| if sprint_data['total_points'] > 0 else 0 | |
| ) | |
| st.metric("Complete", f"{completion_rate:.1f}%") | |
| # Show progress bar | |
| progress = ( | |
| sprint_data['completed_points'] / sprint_data['total_points'] | |
| if sprint_data['total_points'] > 0 else 0 | |
| ) | |
| st.progress(progress) | |
| def main(): | |
| st.title("Jira Integration Test") | |
| # Add test data button | |
| if st.button("Load Test Data"): | |
| st.session_state.filtered_scenarios_df = create_test_data() | |
| st.success("Test data loaded!") | |
| # Sidebar: Jira login control | |
| with st.sidebar: | |
| if 'is_authenticated' not in st.session_state: | |
| st.session_state.is_authenticated = False | |
| if st.session_state.is_authenticated and 'jira_client' in st.session_state and st.session_state.jira_client: | |
| st.success("Connected to Jira") | |
| else: | |
| if st.button("Connect to Jira"): | |
| is_authenticated = render_jira_login() | |
| st.session_state.is_authenticated = is_authenticated | |
| st.experimental_rerun() | |
| if st.session_state.get('is_authenticated', False) and st.session_state.get('projects'): | |
| # Fixed project and board selection | |
| project_key = "RS" | |
| board_type = "scrum" | |
| board_name = "Regression Sprints" | |
| # Display fixed selections in a more compact way | |
| st.markdown(""" | |
| <div style='display: flex; gap: 10px; margin-bottom: 15px; font-size: 0.9em;'> | |
| <div style='flex: 1;'> | |
| <div style='color: #E0E0E0; margin-bottom: 4px;'>Project</div> | |
| <div style='background-color: #262730; padding: 5px 8px; border-radius: 4px; font-size: 0.9em;'>RS - Regression</div> | |
| </div> | |
| <div style='flex: 1;'> | |
| <div style='color: #E0E0E0; margin-bottom: 4px;'>Board</div> | |
| <div style='background-color: #262730; padding: 5px 8px; border-radius: 4px; font-size: 0.9em;'>Regression Sprints (scrum)</div> | |
| </div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Display sprint stats (only fetch if no data exists) | |
| display_story_points_stats(force_refresh=False) | |
| # Show test data if loaded | |
| if 'filtered_scenarios_df' in st.session_state: | |
| st.subheader("Failed Scenarios") | |
| st.dataframe(st.session_state.filtered_scenarios_df) | |
| # Get environment directly from the DataFrame | |
| if 'Environment' in st.session_state.filtered_scenarios_df.columns: | |
| environment = st.session_state.filtered_scenarios_df['Environment'].iloc[0] | |
| st.info(f"Using environment from data: {environment}") | |
| process_failures_button(st.session_state.filtered_scenarios_df, environment) | |
| else: | |
| st.error("No environment information found in the data") | |
| # Add project fields button at the bottom | |
| if st.button("Show Project Fields"): | |
| display_project_fields() | |
| if __name__ == "__main__": | |
| main() |