StingrayExplorer / modules /QuickLook /AveragePowerSpectrum.py
kartikmandar's picture
feat: add service layer, performance monitoring, and UI improvements
27762e4
import panel as pn
import holoviews as hv
from utils.app_context import AppContext
import pandas as pd
import warnings
import hvplot.pandas
import holoviews.operation.datashader as hd
from utils.DashboardClasses import (
MainHeader,
MainArea,
OutputBox,
WarningBox,
HelpBox,
Footer,
WarningHandler,
FloatingPlot,
PlotsContainer,
)
from stingray import AveragedPowerspectrum
colors = [
"#1f77b4",
"#ff7f0e",
"#2ca02c",
"#d62728",
"#9467bd",
"#8c564b",
"#e377c2",
"#7f7f7f",
"#bcbd22",
"#17becf",
"#aec7e8",
"#ffbb78",
"#98df8a",
"#ff9896",
"#c5b0d5",
"#c49c94",
"#f7b6d2",
"#c7c7c7",
"#dbdb8d",
"#9edae5",
]
# Create a warning handler
def create_warning_handler():
warning_handler = WarningHandler()
warnings.showwarning = warning_handler.warn
return warning_handler
""" Header Section """
def create_quicklook_avg_powerspectrum_header(context: AppContext):
home_heading_input = pn.widgets.TextInput(
name="Heading", value="QuickLook Averaged Power Spectrum"
)
home_subheading_input = pn.widgets.TextInput(name="Subheading", value="")
return MainHeader(heading=home_heading_input, subheading=home_subheading_input)
""" Output Box Section """
def create_loadingdata_output_box(content):
return OutputBox(output_content=content)
""" Warning Box Section """
def create_loadingdata_warning_box(content):
return WarningBox(warning_content=content)
""" Float Panel """
def create_floatpanel_area(content, title):
return FloatingPlot(content=content, title=title)
""" Main Area Section """
def create_avg_powerspectrum_tab(
context: AppContext,
warning_handler,
):
# Define Widgets
event_list_dropdown = pn.widgets.Select(
name="Select Event List(s)",
options={name: i for i, (name, event) in enumerate(context.state.get_event_data())},
)
dt_input = pn.widgets.FloatInput(
name="Select dt",
value=1.0,
step=0.0001,
start=0.0000000001,
end=1000.0,
)
norm_select = pn.widgets.Select(
name="Normalization",
options=["frac", "leahy", "abs", "none"],
value="leahy",
)
segment_size_input = pn.widgets.FloatInput(name="Segment Size", value=10, step=1)
multi_event_select = pn.widgets.MultiSelect(
name="Or Select Event List(s) to Combine",
options={name: i for i, (name, event) in enumerate(context.state.get_event_data())},
size=8,
)
floatpanel_plots_checkbox = pn.widgets.Checkbox(
name="Add Plot to FloatingPanel", value=True
)
dataframe_checkbox = pn.widgets.Checkbox(
name="Add DataFrame to FloatingPanel", value=False
)
rasterize_checkbox = pn.widgets.Checkbox(name="Rasterize Plots", value=True)
# New Checkboxes for Rebinning
linear_rebin_checkbox = pn.widgets.Checkbox(name="Linear Rebinning", value=False)
log_rebin_checkbox = pn.widgets.Checkbox(name="Logarithmic Rebinning", value=False)
rebin_with_original_checkbox = pn.widgets.Checkbox(
name="Plot Rebin with Original", value=False
)
# Input for Rebin Size
rebin_size_input = pn.widgets.FloatInput(
name="Rebin Size",
value=0.1,
step=0.000001,
start=0.01,
end=1000.0,
)
time_info_pane = pn.pane.Markdown(
"Select an event list to see time range", width=600
)
# Update time info when an event list is selected
def update_time_info(event):
selected_index = event_list_dropdown.value
if selected_index is not None:
event_list_name = context.state.get_event_data()[selected_index][0]
event_list = context.state.get_event_data()[selected_index][1]
start_time = event_list.time[0]
end_time = event_list.time[-1]
time_info_pane.object = (
f"**Event List:** {event_list_name} \n"
f"**Start Time:** {start_time} \n"
f"**End Time:** {end_time}"
)
else:
time_info_pane.object = "Select an event list to see time range"
# Internal functions to encapsulate functionality
def create_dataframe(selected_event_list_index, dt, norm, segment_size):
if selected_event_list_index is not None:
event_list = context.state.get_event_data()[selected_event_list_index][1]
# Use spectrum service to create averaged power spectrum
result = context.services.spectrum.create_averaged_power_spectrum(
event_list=event_list,
dt=dt,
segment_size=segment_size,
norm=norm
)
if not result["success"]:
output_box_container[:] = [
create_loadingdata_output_box(f"Error: {result['message']}")
]
return None, None
ps = result["data"]
# Use export service to convert to DataFrame
df_result = context.services.export.to_dataframe_power_spectrum(ps)
if df_result["success"]:
return df_result["data"], ps
else:
output_box_container[:] = [
create_loadingdata_output_box(f"Error: {df_result['message']}")
]
return None, None
return None, None
def create_holoviews_panes(plot):
return pn.pane.HoloViews(plot, width=600, height=600)
def create_holoviews_plots(ps, title, dt, norm, segment_size, color_key=None):
label = f"{title} (dt={dt}, norm={norm}, segment={segment_size})"
plot = hv.Curve((ps.freq, ps.power), label=label).opts(
xlabel="Frequency (Hz)",
ylabel="Power",
width=600,
height=600,
shared_axes=False,
)
if color_key:
if rasterize_checkbox.value:
return hd.rasterize(plot, line_width=3, pixel_ratio=2).opts(
tools=["hover"],
cmap=[color_key],
width=600,
height=600,
colorbar=True,
)
else:
return plot
else:
if rasterize_checkbox.value:
return hd.rasterize(plot, line_width=3, pixel_ratio=2).opts(
tools=["hover"],
width=600,
height=600,
colorbar=True,
cmap="Viridis",
)
else:
return plot
def rebin_powerspectrum(ps):
rebin_size = rebin_size_input.value
if linear_rebin_checkbox.value:
return ps.rebin(rebin_size, method="mean")
elif log_rebin_checkbox.value:
return ps.rebin_log(f=rebin_size)
return None
def create_holoviews_plots_no_colorbar(
ps, title, dt, norm, segment_size, color_key=None
):
label = f"{title} (dt={dt}, norm={norm}, segment={segment_size})"
plot = hv.Curve((ps.freq, ps.power), label=label).opts(
xlabel="Frequency (Hz)",
ylabel="Power",
width=600,
height=600,
shared_axes=False,
)
if color_key:
if rasterize_checkbox.value:
return hd.rasterize(plot, line_width=3, pixel_ratio=2).opts(
tools=["hover"],
cmap=[color_key],
width=600,
height=600,
colorbar=False,
)
else:
return plot
else:
if rasterize_checkbox.value:
return hd.rasterize(plot, line_width=3, pixel_ratio=2).opts(
tools=["hover"],
width=600,
height=600,
colorbar=False,
cmap="Viridis",
)
else:
return plot
def create_dataframe_panes(df, title, dt, norm, segment_size):
return pn.FlexBox(
pn.pane.Markdown(
f"**{title} (dt={dt}, norm={norm}, segment={segment_size})**"
),
pn.pane.DataFrame(df, width=600, height=600),
align_items="center",
justify_content="center",
flex_wrap="nowrap",
flex_direction="column",
)
def generate_avg_powerspectrum(event=None):
if not context.state.get_event_data():
context.update_container('output_box',
create_loadingdata_output_box("No loaded event data available.")
)
return
selected_event_list_index = event_list_dropdown.value
if selected_event_list_index is None:
context.update_container('output_box',
create_loadingdata_output_box("No event list selected.")
)
return
dt = dt_input.value
norm = norm_select.value
segment_size = segment_size_input.value
df, ps = create_dataframe(selected_event_list_index, dt, norm, segment_size)
if df is not None:
plot_title = f"Averaged Power Spectrum for {context.state.get_event_data()[selected_event_list_index][0]}"
plot_hv = create_holoviews_plots(
ps, title=plot_title, dt=dt, norm=norm, segment_size=segment_size
)
holoviews_output = create_holoviews_panes(plot_hv)
if floatpanel_plots_checkbox.value:
context.append_to_container('float_panel',
create_floatpanel_area(content=holoviews_output, title=plot_title)
)
else:
markdown_content = f"## {plot_title}"
context.append_to_container('plots',
pn.FlexBox(
pn.pane.Markdown(markdown_content),
holoviews_output,
align_items="center",
justify_content="center",
flex_wrap="nowrap",
flex_direction="column",
)
)
context.update_container('output_box',
create_loadingdata_output_box(
"Averaged Power Spectrum generated successfully."
)
)
else:
context.update_container('output_box',
create_loadingdata_output_box(
"Failed to create averaged power spectrum."
)
)
def show_dataframe(event=None):
if not context.state.get_event_data():
context.update_container('output_box',
create_loadingdata_output_box("No loaded event data available.")
)
return
selected_event_list_index = event_list_dropdown.value
if selected_event_list_index is None:
context.update_container('output_box',
create_loadingdata_output_box("No event list selected.")
)
return
dt = dt_input.value
norm = norm_select.value
segment_size = segment_size_input.value
df, ps = create_dataframe(selected_event_list_index, dt, norm, segment_size)
if df is not None:
dataframe_output = create_dataframe_panes(
df,
f"{context.state.get_event_data()[selected_event_list_index][0]}",
dt,
norm,
segment_size,
)
if dataframe_checkbox.value:
context.append_to_container('float_panel',
create_floatpanel_area(
content=dataframe_output,
title=f"DataFrame for {context.state.get_event_data()[selected_event_list_index][0]}",
)
)
else:
context.append_to_container('plots',dataframe_output)
context.update_container('output_box',
create_loadingdata_output_box("DataFrame generated successfully.")
)
else:
context.update_container('output_box',
create_loadingdata_output_box("Failed to create dataframe.")
)
def combine_selected_plots(event=None):
selected_event_list_indices = multi_event_select.value
if not selected_event_list_indices:
context.update_container('output_box',
create_loadingdata_output_box("No event lists selected.")
)
return
combined_plots = []
combined_title = []
for index in selected_event_list_indices:
dt = dt_input.value
norm = norm_select.value
segment_size = segment_size_input.value
df, ps = create_dataframe(index, dt, norm, segment_size)
if df is not None:
event_list_name = context.state.get_event_data()[index][0]
plot_hv = create_holoviews_plots_no_colorbar(
ps,
title=event_list_name,
dt=dt,
norm=norm,
segment_size=segment_size,
)
combined_plots.append(plot_hv)
combined_title.append(event_list_name)
if combined_plots:
combined_plot = hv.Overlay(combined_plots).opts(shared_axes=False)
combined_pane = create_holoviews_panes(combined_plot)
combined_title_str = " + ".join(combined_title)
if floatpanel_plots_checkbox.value:
context.append_to_container('float_panel',
create_floatpanel_area(
content=combined_pane, title=combined_title_str
)
)
else:
markdown_content = f"## {combined_title_str}"
context.append_to_container('plots',
pn.FlexBox(
pn.pane.Markdown(markdown_content),
combined_pane,
align_items="center",
justify_content="center",
flex_wrap="nowrap",
flex_direction="column",
)
)
context.update_container('output_box',
create_loadingdata_output_box("Combined plots generated successfully.")
)
else:
context.update_container('output_box',
create_loadingdata_output_box("Failed to combine plots.")
)
generate_powerspectrum_button = pn.widgets.Button(
name="Generate Averaged Power Spectrum", button_type="primary"
)
generate_powerspectrum_button.on_click(generate_avg_powerspectrum)
combine_plots_button = pn.widgets.Button(
name="Combine Selected Plots", button_type="success"
)
combine_plots_button.on_click(combine_selected_plots)
show_dataframe_button = pn.widgets.Button(
name="Show DataFrame", button_type="primary"
)
show_dataframe_button.on_click(show_dataframe)
tab_content = pn.Column(
event_list_dropdown,
dt_input,
norm_select,
segment_size_input,
multi_event_select,
floatpanel_plots_checkbox,
dataframe_checkbox,
rasterize_checkbox,
pn.Row(
generate_powerspectrum_button, show_dataframe_button, combine_plots_button
),
)
return tab_content
def create_quicklook_avg_powerspectrum_main_area(context: AppContext):
warning_handler = create_warning_handler()
tabs_content = {
"Averaged Power Spectrum": create_avg_powerspectrum_tab(
context=context,
warning_handler=warning_handler,
),
}
return MainArea(tabs_content=tabs_content)
def create_quicklook_avg_powerspectrum_area():
"""
Create the plots area for the quicklook averaged power spectrum tab.
Returns:
PlotsContainer: An instance of PlotsContainer with the plots for the quicklook averaged power spectrum tab.
"""
return PlotsContainer()