testPlan / src /streamlit_app.py
iemdpk's picture
Update src/streamlit_app.py
eb0ce7d verified
"""
Azure DevOps Test Management Tool
A Streamlit application for managing test work items in Azure DevOps.
"""
import streamlit as st
import requests
import base64
import json
import pandas as pd
import io
from typing import List, Dict, Optional, Any
# Page configuration
st.set_page_config(
page_title="Azure DevOps Test Manager",
page_icon="πŸ§ͺ",
layout="wide",
initial_sidebar_state="expanded"
)
# Custom CSS for dark theme styling
st.markdown("""
<style>
/* Dark background for main content */
.main .block-container {
background-color: #1a1d29;
color: #ffffff;
}
/* Sidebar dark theme */
[data-testid="stSidebar"] {
background-color: #1a1d29;
}
[data-testid="stSidebar"] .stMarkdown {
color: #ffffff;
}
/* Headers */
.main-header {
font-size: 2.5rem;
font-weight: bold;
color: #ffa800;
margin-bottom: 1rem;
}
h1, h2, h3, h4, h5, h6 {
color: #ffa800 !important;
}
/* Labels and text */
label, .stMarkdown, p, span {
color: #ffffff !important;
}
/* Work item cards with dark theme */
.work-item-card {
background-color: #252a3a;
border-radius: 10px;
padding: 20px;
margin-bottom: 15px;
border-left: 4px solid #ffa800;
box-shadow: 0 2px 8px rgba(0,0,0,0.3);
color: #ffffff;
}
.work-item-id {
font-size: 0.9rem;
color: #ffa800;
font-weight: bold;
}
.work-item-title {
font-size: 1.2rem;
font-weight: bold;
color: #ffffff;
margin: 10px 0;
}
.work-item-status {
display: inline-block;
padding: 4px 12px;
border-radius: 12px;
font-size: 0.8rem;
font-weight: bold;
}
.status-active {
background-color: #1e3a5f;
color: #4fc3f7;
}
.status-closed {
background-color: #1b5e20;
color: #4CAF50;
}
.status-new {
background-color: #e65100;
color: #ffffff;
}
/* Messages */
.success-message {
background-color: #1b5e20;
color: #4CAF50;
padding: 10px;
border-radius: 5px;
margin: 10px 0;
border: 1px solid #4CAF50;
}
.error-message {
background-color: #5c1c1c;
color: #FF6B6B;
padding: 10px;
border-radius: 5px;
margin: 10px 0;
border: 1px solid #FF6B6B;
}
/* Comment section */
.comment-section {
background-color: #2d1b00;
border: 1px solid #ffa800;
border-radius: 8px;
padding: 15px;
margin-top: 10px;
}
/* Project cards */
.project-card {
background-color: #252a3a;
border: 2px solid #ffa800;
border-radius: 10px;
padding: 15px;
margin-bottom: 10px;
cursor: pointer;
transition: all 0.3s;
color: #ffffff;
}
.project-card:hover {
background-color: #31384a;
transform: translateY(-2px);
box-shadow: 0 4px 12px rgba(255,168,0,0.2);
}
/* Connection status */
.connection-status {
padding: 10px;
border-radius: 5px;
margin: 10px 0;
}
.connected {
background-color: #1b5e20;
color: #4CAF50;
border: 1px solid #4CAF50;
}
.disconnected {
background-color: #5c4a00;
color: #ffa800;
border: 1px solid #ffa800;
}
/* Debug section */
.debug-section {
background-color: #1a1a2e;
border: 1px solid #666;
border-radius: 5px;
padding: 10px;
margin: 10px 0;
font-family: monospace;
font-size: 0.85rem;
color: #00ff00;
}
/* Info boxes */
.stAlert {
background-color: #252a3a !important;
color: #ffffff !important;
border: 1px solid #ffa800 !important;
}
/* Text inputs */
[data-testid="stTextInput"] input {
background-color: #252a3a !important;
color: #ffffff !important;
border: 1px solid #ffa800 !important;
}
/* Select boxes */
[data-testid="stSelectbox"] > div > div {
background-color: #252a3a !important;
color: #ffffff !important;
border: 1px solid #ffa800 !important;
}
/* Text area */
[data-testid="stTextArea"] textarea {
background-color: #252a3a !important;
color: #ffffff !important;
border: 1px solid #ffa800 !important;
}
/* Multiselect */
[data-testid="stMultiSelect"] > div {
background-color: #252a3a !important;
}
/* Tabs */
button[data-baseweb="tab"] {
background-color: #252a3a !important;
color: #ffffff !important;
}
button[data-baseweb="tab"][aria-selected="true"] {
background-color: #ffa800 !important;
color: #1a1d29 !important;
}
/* Divider */
hr {
border-color: #ffa800 !important;
opacity: 0.3;
}
</style>
""", unsafe_allow_html=True)
class AzureDevOpsClient:
"""Client for interacting with Azure DevOps API."""
def __init__(self, organization: str, pat: str, project: Optional[str] = None):
self.organization = organization
self.project = project
self.pat = pat
self.base_url = f"https://dev.azure.com/{organization}"
self.auth_header = self._get_auth_header()
self.debug_info = [] # Store debug information
def _get_auth_header(self) -> Dict[str, str]:
"""Create authorization header with PAT."""
credentials = base64.b64encode(f":{self.pat}".encode()).decode()
return {
"Authorization": f"Basic {credentials}",
"Content-Type": "application/json"
}
def test_connection(self) -> bool:
"""Test if the connection to Azure DevOps is valid."""
url = f"{self.base_url}/_apis/projects?top=1&api-version=7.0"
try:
response = requests.get(url, headers=self.auth_header)
response.raise_for_status()
return True
except requests.exceptions.RequestException:
return False
def get_projects(self) -> List[Dict]:
"""Fetch all projects from the organization."""
url = f"{self.base_url}/_apis/projects?api-version=7.0"
try:
response = requests.get(url, headers=self.auth_header)
response.raise_for_status()
return response.json().get("value", [])
except requests.exceptions.RequestException as e:
st.error(f"Error fetching projects: {str(e)}")
return []
def get_work_items(self, wiql_query: Optional[str] = None, work_item_type: Optional[str] = None,
iteration_path: Optional[str] = None, area_path: Optional[str] = None,
debug: bool = False) -> List[Dict]:
"""Fetch work items using WIQL query or get all work items."""
self.debug_info = [] # Reset debug info
if not self.project:
st.error("No project selected!")
return []
# Build query - simplified to get all work items first
if wiql_query is None:
# Build WHERE clause conditions
conditions = ["[System.TeamProject] = @project"]
if work_item_type and work_item_type != "All Types":
conditions.append(f"[System.WorkItemType] = '{work_item_type}'")
if iteration_path and iteration_path != "All Iterations":
conditions.append(f"[System.IterationPath] = '{iteration_path}'")
if area_path and area_path != "All Areas":
conditions.append(f"[System.AreaPath] = '{area_path}'")
where_clause = " AND ".join(conditions)
wiql_query = f"""SELECT [System.Id], [System.Title], [System.State], [System.WorkItemType], [System.IterationPath]
FROM workitems
WHERE {where_clause}
ORDER BY [System.ChangedDate] DESC"""
url = f"{self.base_url}/{self.project}/_apis/wit/wiql?api-version=7.0"
if debug:
self.debug_info.append(f"URL: {url}")
self.debug_info.append(f"Query: {wiql_query}")
self.debug_info.append(f"Project: {self.project}")
try:
response = requests.post(
url,
headers=self.auth_header,
json={"query": wiql_query}
)
if debug:
self.debug_info.append(f"Status Code: {response.status_code}")
response.raise_for_status()
result = response.json()
if debug:
self.debug_info.append(f"Query returned {len(result.get('workItems', []))} work items")
work_item_ids = [item["id"] for item in result.get("workItems", [])]
if not work_item_ids:
if debug:
self.debug_info.append("No work item IDs returned from query")
return []
# Fetch detailed work item information
return self._get_work_item_details(work_item_ids, debug)
except requests.exceptions.RequestException as e:
error_msg = f"Error fetching work items: {str(e)}"
# Check for 401 error specifically
if hasattr(e, 'response') and e.response is not None and e.response.status_code == 401:
st.error("πŸ”’ **Authentication Error (401)**")
st.markdown("""
Your PAT (Personal Access Token) doesn't have the required permissions.
**Required PAT Scopes:**
- βœ… Work Items: **Read & Write**
- βœ… Project and Team: **Read**
**How to create a new PAT:**
1. Go to: https://dev.azure.com/{org}/_usersSettings/tokens
2. Click **"New Token"**
3. Give it a name (e.g., "Test Manager")
4. Set expiration
5. Under **Scopes**, select:
- **Work Items**: Read & Write
- **Project and Team**: Read
6. Click **Create** and copy the token
7. Paste it in the sidebar and reconnect
""")
else:
st.error(error_msg)
if debug:
self.debug_info.append(error_msg)
if hasattr(e, 'response') and e.response is not None:
self.debug_info.append(f"Status Code: {e.response.status_code}")
self.debug_info.append(f"Response: {e.response.text[:500]}")
return []
def get_work_item_types(self) -> List[str]:
"""Get available work item types for the project."""
if not self.project:
return []
url = f"{self.base_url}/{self.project}/_apis/wit/workitemtypes?api-version=7.0"
try:
response = requests.get(url, headers=self.auth_header)
response.raise_for_status()
types = response.json().get("value", [])
type_names = [t.get("name", "") for t in types if t.get("name")]
return sorted(type_names)
except requests.exceptions.RequestException as e:
st.error(f"Error fetching work item types: {str(e)}")
return []
def _get_work_item_details(self, work_item_ids: List[int], debug: bool = False) -> List[Dict]:
"""Get detailed information for work items."""
if not work_item_ids:
return []
# Azure DevOps has a limit on URL length, so batch if needed
batch_size = 200
all_items = []
for i in range(0, len(work_item_ids), batch_size):
batch_ids = work_item_ids[i:i + batch_size]
ids_str = ",".join(map(str, batch_ids))
url = f"{self.base_url}/_apis/wit/workitems?ids={ids_str}&$expand=all&api-version=7.0"
try:
response = requests.get(url, headers=self.auth_header)
response.raise_for_status()
batch_items = response.json().get("value", [])
all_items.extend(batch_items)
if debug:
self.debug_info.append(f"Fetched {len(batch_items)} work items in batch {i//batch_size + 1}")
except requests.exceptions.RequestException as e:
error_msg = f"Error fetching work item details for batch: {str(e)}"
st.error(error_msg)
if debug:
self.debug_info.append(error_msg)
return all_items
def update_work_item_status(self, work_item_id: int, new_state: str, comment: Optional[str] = None) -> bool:
"""Update work item state and optionally add a comment."""
url = f"{self.base_url}/_apis/wit/workitems/{work_item_id}?api-version=7.0"
# Prepare the patch document
patch_document = [
{
"op": "add",
"path": "/fields/System.State",
"value": new_state
}
]
try:
response = requests.patch(
url,
headers={**self.auth_header, "Content-Type": "application/json-patch+json"},
json=patch_document
)
response.raise_for_status()
# Add comment if provided
if comment:
self._add_comment(work_item_id, comment)
return True
except requests.exceptions.RequestException as e:
st.error(f"Error updating work item {work_item_id}: {str(e)}")
if hasattr(e, 'response') and e.response is not None:
st.error(f"Response: {e.response.text}")
return False
def _add_comment(self, work_item_id: int, comment: str) -> bool:
"""Add a comment to a work item."""
url = f"{self.base_url}/_apis/wit/workitems/{work_item_id}/comments?api-version=7.0"
try:
response = requests.post(
url,
headers=self.auth_header,
json={"text": comment}
)
response.raise_for_status()
return True
except requests.exceptions.RequestException as e:
st.error(f"Error adding comment to work item {work_item_id}: {str(e)}")
return False
def create_work_item(self, title: str, description: str = "", work_item_type: str = "Task",
priority: int = 2, assigned_to: str = "", tags: str = "",
iteration: str = "", area: str = "") -> Optional[Dict]:
"""Create a new work item."""
if not self.project:
st.error("No project selected!")
return None
url = f"{self.base_url}/{self.project}/_apis/wit/workitems/${work_item_type}?api-version=7.0"
# Prepare the patch document
patch_document = [
{
"op": "add",
"path": "/fields/System.Title",
"value": title
}
]
if description:
patch_document.append({
"op": "add",
"path": "/fields/System.Description",
"value": description
})
if priority:
patch_document.append({
"op": "add",
"path": "/fields/Microsoft.VSTS.Common.Priority",
"value": priority
})
if assigned_to:
patch_document.append({
"op": "add",
"path": "/fields/System.AssignedTo",
"value": assigned_to
})
if tags:
patch_document.append({
"op": "add",
"path": "/fields/System.Tags",
"value": tags
})
if iteration:
patch_document.append({
"op": "add",
"path": "/fields/System.IterationPath",
"value": iteration if iteration.startswith(self.project) else f"{self.project}\\{iteration}"
})
if area:
patch_document.append({
"op": "add",
"path": "/fields/System.AreaPath",
"value": area if area.startswith(self.project) else f"{self.project}\\{area}"
})
try:
response = requests.post(
url,
headers={**self.auth_header, "Content-Type": "application/json-patch+json"},
json=patch_document
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
st.error(f"Error creating work item: {str(e)}")
if hasattr(e, 'response') and e.response is not None:
st.error(f"Response: {e.response.text[:500]}")
return None
def bulk_create_work_items(self, df: pd.DataFrame) -> tuple[int, List[str]]:
"""Create multiple work items from a DataFrame."""
created_count = 0
errors = []
required_columns = ['Title']
for col in required_columns:
if col not in df.columns:
return 0, [f"Required column '{col}' not found in CSV"]
for index, row in df.iterrows():
try:
title = str(row.get('Title', '')).strip()
if not title:
errors.append(f"Row {index + 1}: Title is empty")
continue
description = str(row.get('Description', '')).strip()
work_item_type = str(row.get('WorkItemType', 'Task')).strip()
priority = int(row.get('Priority', 2)) if pd.notna(row.get('Priority')) else 2
assigned_to = str(row.get('AssignedTo', '')).strip()
tags = str(row.get('Tags', '')).strip()
iteration = str(row.get('Iteration', '')).strip()
area = str(row.get('Area', '')).strip()
result = self.create_work_item(
title=title,
description=description,
work_item_type=work_item_type,
priority=priority,
assigned_to=assigned_to,
tags=tags,
iteration=iteration,
area=area
)
if result:
created_count += 1
else:
errors.append(f"Row {index + 1}: Failed to create '{title}'")
except Exception as e:
errors.append(f"Row {index + 1}: {str(e)}")
return created_count, errors
def get_test_plans(self) -> List[Dict]:
"""Fetch test plans from the project."""
if not self.project:
st.error("No project selected!")
return []
url = f"{self.base_url}/{self.project}/_apis/testplan/plans?api-version=7.0"
try:
response = requests.get(url, headers=self.auth_header)
response.raise_for_status()
return response.json().get("value", [])
except requests.exceptions.RequestException as e:
st.error(f"Error fetching test plans: {str(e)}")
return []
def set_project(self, project: str):
"""Set the current project."""
self.project = project
def get_iterations(self) -> List[Dict]:
"""Fetch all iterations (sprints) for the project."""
if not self.project:
return []
url = f"{self.base_url}/{self.project}/_apis/work/teamsettings/iterations?api-version=7.0"
try:
response = requests.get(url, headers=self.auth_header)
response.raise_for_status()
return response.json().get("value", [])
except requests.exceptions.RequestException as e:
st.error(f"Error fetching iterations: {str(e)}")
return []
def get_areas(self) -> List[Dict]:
"""Fetch all areas for the project."""
if not self.project:
return []
url = f"{self.base_url}/{self.project}/_apis/work/teamsettings/areas?api-version=7.0"
try:
response = requests.get(url, headers=self.auth_header)
response.raise_for_status()
return response.json().get("value", [])
except requests.exceptions.RequestException as e:
st.error(f"Error fetching areas: {str(e)}")
return []
def initialize_session_state():
"""Initialize Streamlit session state variables."""
if "client" not in st.session_state:
st.session_state.client = None
if "projects" not in st.session_state:
st.session_state.projects = []
if "selected_project" not in st.session_state:
st.session_state.selected_project = None
if "work_items" not in st.session_state:
st.session_state.work_items = []
if "test_plans" not in st.session_state:
st.session_state.test_plans = []
if "comment_work_item_id" not in st.session_state:
st.session_state.comment_work_item_id = None
if "success_message" not in st.session_state:
st.session_state.success_message = None
if "connection_step" not in st.session_state:
st.session_state.connection_step = "connect" # connect, select_project, connected
if "work_item_types" not in st.session_state:
st.session_state.work_item_types = []
if "debug_mode" not in st.session_state:
st.session_state.debug_mode = False
if "iterations" not in st.session_state:
st.session_state.iterations = []
if "selected_iteration" not in st.session_state:
st.session_state.selected_iteration = "All Iterations"
if "areas" not in st.session_state:
st.session_state.areas = []
if "selected_area" not in st.session_state:
st.session_state.selected_area = "All Areas"
def reset_connection():
"""Reset connection state."""
st.session_state.client = None
st.session_state.projects = []
st.session_state.selected_project = None
st.session_state.work_items = []
st.session_state.test_plans = []
st.session_state.work_item_types = []
st.session_state.iterations = []
st.session_state.selected_iteration = "All Iterations"
st.session_state.areas = []
st.session_state.selected_area = "All Areas"
st.session_state.connection_step = "connect"
st.session_state.success_message = "Disconnected successfully"
st.rerun()
def render_sidebar():
"""Render the sidebar with connection settings."""
with st.sidebar:
st.header("πŸ”§ Connection Settings")
# Step 1: Connect to Organization
if st.session_state.connection_step == "connect":
st.subheader("Step 1: Connect to Organization")
# Organization input
org = st.text_input(
"Organization",
placeholder="your-organization",
help="Your Azure DevOps organization name (from dev.azure.com/{organization})"
)
# PAT input
pat = st.text_input(
"Personal Access Token (PAT)",
type="password",
placeholder="Enter your PAT",
help="Create a PAT at: https://dev.azure.com/{org}/_usersSettings/tokens"
)
# Connect button
if st.button("πŸ”— Connect to Organization", use_container_width=True, type="primary"):
if org and pat:
with st.spinner("Connecting..."):
client = AzureDevOpsClient(org, pat)
if client.test_connection():
st.session_state.client = client
# Fetch projects
st.session_state.projects = client.get_projects()
if st.session_state.projects:
st.session_state.connection_step = "select_project"
st.session_state.success_message = f"βœ… Connected! Found {len(st.session_state.projects)} projects"
else:
st.session_state.success_message = "βœ… Connected! No projects found"
st.rerun()
else:
st.error("❌ Connection failed. Check your organization and PAT.")
else:
st.error("⚠️ Please fill in all fields")
# Step 2: Select Project
elif st.session_state.connection_step == "select_project":
st.subheader("Step 2: Select Project")
if st.session_state.projects:
project_names = [p.get("name", "") for p in st.session_state.projects]
selected = st.selectbox(
"Choose a Project",
options=project_names,
index=0 if project_names else None
)
col1, col2 = st.columns(2)
with col1:
if st.button("βœ… Select Project", use_container_width=True, type="primary"):
if selected:
st.session_state.selected_project = selected
st.session_state.client.set_project(selected)
st.session_state.connection_step = "connected"
# Fetch available work item types, iterations, and areas
with st.spinner("Loading work item types, iterations, and areas..."):
st.session_state.work_item_types = st.session_state.client.get_work_item_types()
st.session_state.iterations = st.session_state.client.get_iterations()
st.session_state.areas = st.session_state.client.get_areas()
st.session_state.success_message = f"βœ… Project '{selected}' selected!"
st.rerun()
with col2:
if st.button("πŸ”™ Back", use_container_width=True):
st.session_state.connection_step = "connect"
st.session_state.projects = []
st.rerun()
else:
st.warning("No projects found")
if st.button("πŸ”™ Back", use_container_width=True):
st.session_state.connection_step = "connect"
st.rerun()
# Connected State
elif st.session_state.connection_step == "connected" and st.session_state.client:
st.markdown(f"""
<div class="connection-status connected">
βœ… <strong>Connected</strong><br>
Org: {st.session_state.client.organization}<br>
Project: {st.session_state.selected_project}
</div>
""", unsafe_allow_html=True)
st.divider()
# Work item type filter
type_options = ["All Types"] + st.session_state.work_item_types
selected_type = st.selectbox(
"Filter by Type",
options=type_options,
index=0,
help="Select a specific work item type or 'All Types' to see everything"
)
# Iteration filter
iteration_options = ["All Iterations"]
if st.session_state.iterations:
iteration_options.extend([iter.get("path", iter.get("name", "")) for iter in st.session_state.iterations])
selected_iteration = st.selectbox(
"Filter by Iteration",
options=iteration_options,
index=0,
help="Select a specific iteration/sprint or 'All Iterations' to see everything"
)
st.session_state.selected_iteration = selected_iteration
# Area filter
area_options = ["All Areas"]
if st.session_state.areas:
area_options.extend([area.get("path", area.get("name", "")) for area in st.session_state.areas])
selected_area = st.selectbox(
"Filter by Area",
options=area_options,
index=0,
help="Select a specific area or 'All Areas' to see everything"
)
st.session_state.selected_area = selected_area
# Debug mode toggle
st.session_state.debug_mode = st.checkbox("πŸ” Debug Mode", value=st.session_state.debug_mode)
# Fetch button
if st.button("πŸ“₯ Load Work Items", use_container_width=True, type="primary"):
with st.spinner("Fetching work items..."):
filter_type = None if selected_type == "All Types" else selected_type
filter_iteration = None if selected_iteration == "All Iterations" else selected_iteration
filter_area = None if selected_area == "All Areas" else selected_area
st.session_state.work_items = st.session_state.client.get_work_items(
work_item_type=filter_type,
iteration_path=filter_iteration,
area_path=filter_area,
debug=st.session_state.debug_mode
)
if st.session_state.work_items:
st.session_state.success_message = f"πŸ“‹ Loaded {len(st.session_state.work_items)} work items"
else:
if st.session_state.work_item_types:
st.session_state.success_message = f"πŸ“‹ No work items found. Available types in your project: {', '.join(st.session_state.work_item_types[:5])}"
else:
st.session_state.success_message = "πŸ“‹ No work items found. Try selecting 'All Types' or check if work items exist in your project."
st.rerun()
st.divider()
# Change Project button
if st.button("πŸ”„ Change Project", use_container_width=True):
st.session_state.connection_step = "select_project"
st.session_state.selected_project = None
st.session_state.work_items = []
st.session_state.test_plans = []
st.session_state.work_item_types = []
st.session_state.iterations = []
st.session_state.selected_iteration = "All Iterations"
st.session_state.areas = []
st.session_state.selected_area = "All Areas"
st.rerun()
# Disconnect button
if st.button("πŸ”Œ Disconnect", use_container_width=True, type="secondary"):
reset_connection()
st.divider()
# Bulk Upload Section
st.subheader("πŸ“€ Bulk Upload Work Items")
# Download demo CSV template
with open("./src/demo_workitems.csv", "r") as f:
demo_csv_content = f.read()
st.download_button(
label="⬇️ Download Demo CSV Template",
data=demo_csv_content,
file_name="demo_workitems_template.csv",
mime="text/csv",
use_container_width=True
)
st.caption("Download the template, modify it with your data, then upload below.")
# CSV file uploader
uploaded_file = st.file_uploader(
"Upload CSV File",
type=['csv'],
help="Upload a CSV file with work items. Required column: Title"
)
if uploaded_file is not None:
try:
df = pd.read_csv(uploaded_file)
# Preview the data
with st.expander("πŸ“‹ Preview CSV Data", expanded=True):
st.dataframe(df, use_container_width=True)
st.write(f"**Total rows:** {len(df)}")
# Validate required columns
if 'Title' not in df.columns:
st.error("❌ CSV must contain a 'Title' column!")
else:
# Upload button
if st.button("πŸš€ Create Work Items", use_container_width=True, type="primary"):
with st.spinner("Creating work items..."):
created_count, errors = st.session_state.client.bulk_create_work_items(df)
if created_count > 0:
st.success(f"βœ… Successfully created {created_count} work items!")
if errors:
with st.expander(f"⚠️ Errors ({len(errors)})"):
for error in errors:
st.error(error)
if created_count > 0:
st.session_state.success_message = f"βœ… Created {created_count} work items from CSV"
st.rerun()
except Exception as e:
st.error(f"❌ Error reading CSV: {str(e)}")
def get_status_class(state: str) -> str:
"""Get CSS class for work item state."""
state_lower = state.lower()
if state_lower in ["closed", "completed", "done", "resolved"]:
return "status-closed"
elif state_lower in ["new", "active"]:
return "status-new"
else:
return "status-active"
def render_work_item(work_item: Dict):
"""Render a single work item card."""
fields = work_item.get("fields", {})
work_item_id = work_item.get("id", "N/A")
title = fields.get("System.Title", "No Title")
state = fields.get("System.State", "Unknown")
work_item_type = fields.get("System.WorkItemType", "Unknown")
assigned_to = fields.get("System.AssignedTo", {}).get("displayName", "Unassigned")
iteration_path = fields.get("System.IterationPath", "Not assigned to iteration")
area_path = fields.get("System.AreaPath", "Not assigned to area")
status_class = get_status_class(state)
st.markdown(f"""
<div class="work-item-card">
<div class="work-item-id">#{work_item_id} β€’ {work_item_type}</div>
<div class="work-item-title">{title}</div>
<div class="work-item-status {status_class}">{state}</div>
<div style="margin-top: 8px; color: #aaa; font-size: 0.85rem;">
πŸ‘€ Assigned to: {assigned_to}
</div>
<div style="margin-top: 4px; color: #ffa800; font-size: 0.85rem;">
πŸ“… Iteration: {iteration_path}
</div>
<div style="margin-top: 4px; color: #4fc3f7; font-size: 0.85rem;">
πŸ“ Area: {area_path}
</div>
</div>
""", unsafe_allow_html=True)
# Action buttons
col1, col2, col3 = st.columns([1, 1, 4])
with col1:
if st.button("βœ… Pass", key=f"pass_{work_item_id}", type="primary"):
if st.session_state.client:
success = st.session_state.client.update_work_item_status(
work_item_id, "Resolved", "Test passed - resolving work item"
)
if success:
st.session_state.success_message = f"βœ… Work item #{work_item_id} resolved successfully!"
st.rerun()
with col2:
if st.button("❌ Fail", key=f"fail_{work_item_id}", type="secondary"):
st.session_state.comment_work_item_id = work_item_id
st.rerun()
# Comment section for failed test
if st.session_state.comment_work_item_id == work_item_id:
st.markdown('<div class="comment-section">', unsafe_allow_html=True)
st.warning("πŸ“ Please add a comment explaining why this test failed:")
comment = st.text_area(
"Comment",
key=f"comment_text_{work_item_id}",
placeholder="Enter failure details...",
height=100
)
col_submit, col_cancel = st.columns([1, 1])
with col_submit:
if st.button("πŸ’Ύ Submit & Reopen", key=f"submit_{work_item_id}", type="primary"):
if comment.strip():
if st.session_state.client:
success = st.session_state.client.update_work_item_status(
work_item_id, "Active", comment
)
if success:
st.session_state.comment_work_item_id = None
st.session_state.success_message = f"πŸ“ Work item #{work_item_id} reopened with comment!"
st.rerun()
else:
st.error("⚠️ Please enter a comment")
with col_cancel:
if st.button("Cancel", key=f"cancel_{work_item_id}"):
st.session_state.comment_work_item_id = None
st.rerun()
st.markdown('</div>', unsafe_allow_html=True)
def main():
"""Main application function."""
initialize_session_state()
# Header
st.markdown('<div class="main-header">πŸ§ͺ Azure DevOps Test Manager</div>', unsafe_allow_html=True)
# Sidebar
render_sidebar()
# Display success message
if st.session_state.success_message:
st.markdown(f'<div class="success-message">{st.session_state.success_message}</div>', unsafe_allow_html=True)
st.session_state.success_message = None
# Display debug information
if st.session_state.debug_mode and st.session_state.client and hasattr(st.session_state.client, 'debug_info') and st.session_state.client.debug_info:
with st.expander("πŸ” Debug Information", expanded=True):
st.markdown('<div class="debug-section">', unsafe_allow_html=True)
for info in st.session_state.client.debug_info:
st.text(info)
st.markdown('</div>', unsafe_allow_html=True)
# Main content based on connection step
if st.session_state.connection_step == "connect":
st.info("πŸ‘ˆ Please enter your Azure DevOps organization and PAT in the sidebar to connect")
# Quick start guide
st.markdown("""
### πŸš€ Quick Start Guide
1. **Enter your Organization** - This is your Azure DevOps organization name (e.g., `mycompany` from `dev.azure.com/mycompany`)
2. **Enter your PAT** - Personal Access Token with Work Items read/write permissions
3. **Click Connect** - Connect to your Azure DevOps organization
4. **Select Project** - Choose from the list of available projects
5. **Fetch Work Items** - Load your test cases and plans
### πŸ“ How to create a PAT
1. Go to: `https://dev.azure.com/{your-org}/_usersSettings/tokens`
2. Click "New Token"
3. Give it a name and select expiration
4. Scopes needed: **Work Items (Read & Write)**
5. Create and copy the token
### πŸ› Troubleshooting
If you're not seeing work items:
- Enable **Debug Mode** in the sidebar to see detailed query information
- Make sure your PAT has "Work Items (Read & Write)" permissions
- Try selecting "All Types" in the type filter
- Check that work items actually exist in your selected project
""")
elif st.session_state.connection_step == "select_project":
st.info("πŸ‘ˆ Please select a project from the sidebar")
# Display available projects
if st.session_state.projects:
st.subheader("πŸ“ Available Projects")
st.write("Select a project from the dropdown in the sidebar")
for project in st.session_state.projects:
name = project.get("name", "")
description = project.get("description", "No description")
state = project.get("state", "")
st.markdown(f"""
<div class="project-card">
<strong>πŸ“ {name}</strong><br>
<small>{description}</small><br>
<small>State: {state}</small>
</div>
""", unsafe_allow_html=True)
elif st.session_state.connection_step == "connected":
# Show available work item types
if st.session_state.work_item_types:
with st.expander("ℹ️ Available Work Item Types in this Project"):
st.write(", ".join(st.session_state.work_item_types))
# Bulk Upload Info
with st.expander("πŸ“€ Bulk Upload Work Items from CSV"):
st.markdown("""
### How to Bulk Upload Work Items
1. **Download the Demo CSV Template** from the sidebar
2. **Modify the CSV** with your work items data
3. **Upload the CSV** using the file uploader in the sidebar
4. **Click 'Create Work Items'** to bulk create all work items
### CSV Format
Required column:
- **Title** - The title of the work item (required)
Optional columns:
- **Description** - Detailed description of the work item
- **WorkItemType** - Type of work item (e.g., Task, Bug, Test Case, User Story)
- **Priority** - Priority level (1=High, 2=Normal, 3=Low)
- **AssignedTo** - Email or display name of assignee
- **Tags** - Semicolon-separated tags (e.g., "tag1;tag2;tag3")
- **Iteration** - Sprint/iteration name (e.g., "Sprint 1", "Iteration 2")
- **Area** - Area path for the work item (e.g., "Frontend", "Backend", "Database")
### Available Work Item Types
""")
if st.session_state.work_item_types:
st.write(", ".join(st.session_state.work_item_types))
else:
st.write("Common types: Task, Bug, Test Case, User Story, Feature, Epic")
if st.session_state.work_items:
st.subheader(f"πŸ“‹ Work Items ({len(st.session_state.work_items)} found)")
# Filter options
col1, col2, col3 = st.columns(3)
with col1:
filter_type = st.multiselect(
"Filter by Type",
options=list(set(item.get("fields", {}).get("System.WorkItemType", "Unknown")
for item in st.session_state.work_items)),
default=[]
)
with col2:
filter_state = st.multiselect(
"Filter by State",
options=list(set(item.get("fields", {}).get("System.State", "Unknown")
for item in st.session_state.work_items)),
default=[]
)
with col3:
filter_iteration = st.multiselect(
"Filter by Iteration",
options=list(set(item.get("fields", {}).get("System.IterationPath", "Unassigned")
for item in st.session_state.work_items)),
default=[]
)
st.markdown("---")
# Apply filters
filtered_items = st.session_state.work_items
if filter_type:
filtered_items = [item for item in filtered_items
if item.get("fields", {}).get("System.WorkItemType") in filter_type]
if filter_state:
filtered_items = [item for item in filtered_items
if item.get("fields", {}).get("System.State") in filter_state]
if filter_iteration:
filtered_items = [item for item in filtered_items
if item.get("fields", {}).get("System.IterationPath") in filter_iteration]
st.write(f"Showing {len(filtered_items)} of {len(st.session_state.work_items)} work items")
# Render work items
for work_item in filtered_items:
render_work_item(work_item)
st.markdown("---")
else:
st.info("πŸ‘ˆ Click 'Load Work Items' button in the sidebar to load work items")
if st.session_state.work_item_types:
st.write("**Available work item types in this project:**")
st.write(", ".join(st.session_state.work_item_types))
if __name__ == "__main__":
main()