Spaces:
Sleeping
Sleeping
import pandas as pd | |
import streamlit as st | |
import csv | |
import io | |
import matplotlib.pyplot as plt | |
import numpy as np | |
def preprocess_csv(input_bytes): | |
text = input_bytes.decode() # Decode bytes to text | |
output = io.StringIO() | |
writer = csv.writer(output) | |
for row in csv.reader(io.StringIO(text)): # Read text as csv | |
if len(row) > 5: | |
row = row[0:5] + [','.join(row[5:])] # Combine extra fields into one | |
writer.writerow(row) | |
output.seek(0) # go to the start of the StringIO object | |
return output | |
def load_data(file): | |
column_names = [ | |
'Functional area', | |
'Scenario name', | |
'Start datetime', | |
'End datetime', | |
'Status', | |
'Error message' | |
] | |
data = pd.read_csv(file, header=None, names=column_names) | |
return data | |
def fill_missing_data(data, column_index, value): | |
data.iloc[:, column_index] = data.iloc[:, column_index].fillna(value) | |
return data | |
def single_main(uploaded_file): | |
# st.title('Single CSV Analyzer') | |
# uploaded_file = st.file_uploader("Upload CSV file", type="csv") | |
if uploaded_file is not None: | |
# Process the csv file | |
column_names = ["Functional area", "Scenario name", "Start datetime", "End datetime", "Status", "Error message"] | |
filet = uploaded_file.read() | |
processed_output = preprocess_csv(filet) | |
processed_file = io.StringIO(processed_output.getvalue()) | |
data = load_data(processed_file) | |
data = fill_missing_data(data, 4, 0) | |
data['Start datetime'] = pd.to_datetime(data['Start datetime'], errors='coerce') | |
data['End datetime'] = pd.to_datetime(data['End datetime'], errors='coerce') | |
data['Time spent'] = (data['End datetime'] - data['Start datetime']).dt.total_seconds() | |
# st.write(data) | |
# Display scenarios with status "failed" grouped by functional area | |
failed_scenarios = data[data['Status'] == 'FAILED'] | |
passed_scenarios = data[data['Status'] == 'PASSED'] | |
# selected_status = st.selectbox("Select a status", ['Failed', 'Passed']) | |
# Use radio buttons for selecting status | |
selected_status = st.radio("Select a status", ['Failed', 'Passed']) | |
# Determine which scenarios to display based on selected status | |
if selected_status == 'Failed': | |
unique_areas = np.append(failed_scenarios['Functional area'].unique(), "All") | |
selected_scenarios = failed_scenarios | |
selected_functional_area = st.selectbox("Select a functional area", unique_areas, index=len(unique_areas)-1) | |
elif selected_status == 'Passed': | |
unique_areas = np.append(passed_scenarios['Functional area'].unique(), "All") | |
selected_scenarios = passed_scenarios | |
selected_functional_area = st.selectbox("Select a functional area", unique_areas, index=len(unique_areas)-1) | |
else: | |
selected_scenarios = None | |
if selected_scenarios is not None: | |
# st.write(f"Scenarios with status '{selected_status}' grouped by functional area:") | |
st.markdown(f"### Scenarios with status '{selected_status}' grouped by functional area:") | |
# Handle the "All" option | |
# Filter scenarios based on selected functional area | |
if selected_functional_area != "All": | |
filtered_scenarios = selected_scenarios[selected_scenarios['Functional area'] == selected_functional_area] | |
else: | |
filtered_scenarios = selected_scenarios | |
# Calculate the average time spent for each functional area | |
average_time_spent_seconds = filtered_scenarios.groupby('Functional area')['Time spent'].mean().reset_index() | |
# Convert average time spent from seconds to minutes and seconds format | |
average_time_spent_seconds['Time spent'] = pd.to_datetime(average_time_spent_seconds['Time spent'], unit='s').dt.strftime('%M:%S') | |
# Group by functional area and get the start datetime for sorting | |
start_datetime_group = filtered_scenarios.groupby('Functional area')['Start datetime'].min().reset_index() | |
# Merge average_time_spent_seconds and start_datetime_group | |
average_time_spent_seconds = average_time_spent_seconds.merge(start_datetime_group, on='Functional area') | |
# Filter scenarios based on selected functional area | |
grouped_filtered_failed_scenarios = filtered_scenarios.groupby('Functional area')[['Scenario name', 'Error message']].apply(lambda x: x.reset_index(drop=True)) | |
st.dataframe(grouped_filtered_failed_scenarios) | |
# Display total count of failures | |
fail_count = len(failed_scenarios) | |
st.write(f"Failing scenarios Count: {fail_count}") | |
# Display total count of Passing | |
pass_count = len(passed_scenarios) | |
st.write(f"Passing scenarios Count: {pass_count}") | |
# Sort the average time spent table by start datetime | |
average_time_spent_seconds = average_time_spent_seconds.sort_values(by='Start datetime') | |
# Display average time spent on each functional area in a table | |
st.markdown("### Average Time Spent on Each Functional Area") | |
st.dataframe(average_time_spent_seconds) | |
# Create and display bar graph of errors by functional area | |
st.write("### Bar graph showing number of failures in each functional area:") | |
error_counts = failed_scenarios['Functional area'].value_counts() | |
plt.figure(figsize=(10, 6)) | |
plt.bar(error_counts.index, error_counts.values) | |
plt.xlabel('Functional Area') | |
plt.ylabel('Number of Errors') | |
plt.title('Number of Errors by Functional Area') | |
plt.xticks(rotation=45, ha='right') | |
plt.tight_layout() # Add this line to adjust layout | |
st.pyplot(plt) | |
else: | |
st.write("### No scenarios with status 'failed' found.") | |
pass | |
def main(): | |
st.title('CSV Analyser') | |
uploaded_file = st.file_uploader("Upload CSV file", type="csv") | |
if uploaded_file is not None: | |
single_main(uploaded_file) # Load the main app when the file is uploaded | |
if __name__ == "__main__": | |
main() | |