dcascade-app / pages /01_dcascade.py
Kelbec
fix output charts
b851993
import os
from pathlib import Path
import shutil
from typing import Optional, cast
from ipyleaflet import Map, GeoData, basemaps, LayersControl, TileLayer
import geopandas as gpd
import numpy as np
import solara
from solara import FigureEcharts
from solara.components.file_drop import FileInfo
from dcascade_py import dcascade_py
import xarray as xr
from shapely.geometry import LineString,MultiLineString
import time
import shutil
file_nc = solara.reactive("")
file_shp = solara.reactive("")
zoom = solara.reactive(6)
center = solara.reactive((20, 0))
global_map_loaded = solara.reactive(False)
variables = solara.reactive([])
sel_var = solara.reactive("")
ds = solara.reactive(None)
feature_properties = solara.reactive(None)
geo_data = solara.reactive(None)
sel_feature = solara.reactive(None)
timescale = solara.reactive("50")
sed_range = solara.reactive("-8,5")
class_size = solara.reactive(2.5)
deposit = solara.reactive(100000.0)
continuous_update = solara.reactive(True)
formulas = ["Wilkock and Crowe 2003", "Engelund and Hansen 1967"] #,"Parker and Klingeman 1982", "Yang formula 1989", "Wong and Parker 2006", "Ackers and White formula 1973"]
formula = solara.reactive("Engelund and Hansen 1967")
partitionings = ["Direct", "Bed material fraction (BMF)", "Transport capacity function (TCF)", "Shear stress correction approach"]
# - Engelund and Hansen with Direct approach\n
# - Engelund and Hansen with Bed material fraction (BMF) approach\n
# - Engelund and Hansen with Transport capacity function (TCF)\n
# - Wilkock and Crowe with Shear stress correction\n""")
partitioning = solara.reactive("Bed material fraction (BMF)")
part_mapping = {
"Direct": 1,
"Bed material fraction (BMF)": 2,
"Transport capacity function (TCF)": 3,
"Shear stress correction approach": 4
}
formula_mapping = {
"Wilkock and Crowe 2003": 2,
"Engelund and Hansen 1967": 3
}
chart_options = solara.reactive({
"line": {
"title": {"text": "Variable"},
"tooltip": {},
"legend": {"data": ["Variable"]},
"xAxis": {"name": "Time step","nameLocation": "middle","nameGap": 30, "data":list(range(1, int(timescale.value)+1))}, # {"type": "category"},
"yAxis": {"name": "Variable","nameLocation": "middle","nameGap": 60},
"series": [{
"type": "line",
"universalTransition": True,
"data":[]
}]
}
})
maps = {
"OpenStreetMap.Mapnik": basemaps.OpenStreetMap.Mapnik,
"OpenTopoMap": basemaps.OpenTopoMap,
"Esri.WorldTopoMap": basemaps.Esri.WorldTopoMap,
}
map_name = solara.reactive(list(maps)[0])
def create_data_archive():
# Specify the folder path to be zipped
folder_to_zip = file_shp.value.split("/deposito")[-2]
# Zip the folder
shutil.make_archive(folder_to_zip,'zip',f"{os.getcwd()}/public/data/",os.path.basename(folder_to_zip))
def on_variable_change(variable):
print("feature",feature_properties.value)
sel_var.set(variable)
if feature_properties.value is None:
solara.Warning(f"Select a feature on the map first",
text=True,
dense=True,
icon=True,
)
else:
fid = feature_properties.value['fromn']
reach = feature_properties.value['reach_id']
data = ds.value[sel_var.value].values[:,fid-1] # On nc file the index starts from 0, fromn starts from 1
# substitute nan values with False
data = np.where(np.isnan(data), None, data)
chart_options.set({
"line": {
"title": {"text": f"{sel_var.value} {reach} ({fid})"},
"tooltip": {},
"legend": {"data": [f"{sel_var.value} {fid}"]},
"xAxis": {"name": "Time step","nameLocation": "middle","nameGap": 30,"data":list(range(1, int(timescale.value)+1))}, # {"type": "category"},
"yAxis": {"name": f"{sel_var.value} {fid}","nameLocation": "middle","nameGap": 60},
"emphasis": {"itemStyle": {"shadowBlur":10}},
"series": [{
"type": "line",
"universalTransition": True,
"data": data.tolist()
}]
}
})
# Define a callback function for the click event
def on_feature_click(feature, **kwargs):
properties = feature['properties']
print("Clicked on:", properties)
feature_properties.set(properties)
sel_feature.set(feature)
if(file_nc.value):
ds.set(xr.open_dataset(file_nc.value, decode_times=False))
variables.set(list(ds.value.keys()))
# ds_dec = xr.decode_cf(ds,decode_timedelta=False)
fid = properties['fromn']
reach = properties['reach_id']
data = ds.value[sel_var.value].values[:,fid-1] # On nc file the index starts from 0, fromn starts from 1
# substitute nan values with False
data = np.where(np.isnan(data), None, data)
chart_options.set({
"line": {
"title": {"text": f"{sel_var.value} {reach} ({fid})"},
"tooltip": {},
"legend": {"data": f"{sel_var.value} {fid}"},
"xAxis": {"name": "Time step","nameLocation": "middle","nameGap": 30,"data":list(range(1, int(timescale.value)+1))}, # {"type": "category"},
"yAxis": {"name": f"{sel_var.value} {fid}","nameLocation": "middle","nameGap": 60},
"emphasis": {"itemStyle": {"shadowBlur":10}},
"series": [{
"type": "line",
"universalTransition": True,
"data": data.tolist()
}]
}
})
ds.value.close()
@solara.component
def MapComponent():
# Isolation is required to prevent the map from overlapping navigation (when screen width < 960px)
with solara.Column(style={"isolation": "isolate"}):
sel_geo_data = None
if not global_map_loaded.value:
#sel_df = xr.open_dataset(file_nc.value) # os.getcwd()+'/public/deposito.shp'
sel_df = gpd.read_file(file_shp.value)
geo_df = sel_df.to_crs(4326) # 32634
geo_data.set(GeoData(geo_dataframe=geo_df,
hover_style={'fillColor': 'red' , 'fillOpacity': 5.2}, name="deposito"))
geo_data.value.on_click(on_feature_click)
center.set((geo_df.total_bounds[1], geo_df.total_bounds[0]))
geo_data.value.style = {'color': 'black', 'fillColor': '#3366cc', 'opacity':0.5, 'weight':3.9, 'dashArray':'2', 'fillOpacity':0.1}
# sel_df.close()
m = Map(center=center.value, zoom = zoom.value, basemap= basemaps.Esri.WorldTopoMap)
map_type = maps[map_name.value]
url = map_type.build_url()
m.add(geo_data.value)
m.add(LayersControl())
if sel_feature.value is not None:
# print(type(sel_feature.value['geometry']['coordinates']))
try:
# Extracting the geometry
geometry = LineString(sel_feature.value['geometry']['coordinates'])
# Creating GeoDataFrame
gdf = gpd.GeoDataFrame([sel_feature.value['properties']], geometry=[geometry])
# Set the CRS (Coordinate Reference System) if known
gdf.crs = "EPSG:4326"
# Set the CRS (Coordinate Reference System) if known
sel_geo_data = GeoData(geo_dataframe=gdf)
sel_geo_data.style = {'color': 'red', 'fillColor': '#3366cc', 'opacity':0.5, 'weight':15.0, 'dashArray':'5'}
except Exception as e:
print(e)
sel_geo_data = None
try:
# Extracting the geometry
geometry = MultiLineString(sel_feature.value['geometry']['coordinates'])
# Creating GeoDataFrame
gdf = gpd.GeoDataFrame([sel_feature.value['properties']], geometry=[geometry])
# Set the CRS (Coordinate Reference System) if known
gdf.crs = "EPSG:4326"
# Set the CRS (Coordinate Reference System) if known
sel_geo_data = GeoData(geo_dataframe=gdf)
sel_geo_data.style = {'color': 'red', 'fillColor': '#3366cc', 'opacity':0.5, 'weight':15.0, 'dashArray':'5'}
except Exception as e:
print(e)
sel_geo_data = None
if sel_geo_data:
m.element( # type: ignore
zoom=zoom.value,
on_zoom=zoom.set,
center=center.value,
on_center=center.set,
scroll_wheel_zoom=True,
layers=[
TileLayer.element(url=url),
# Marker.element(location=marker_location.value, draggable=True, on_location=location_changed),
geo_data.value,
sel_geo_data
]
)
else:
m.element( # type: ignore
zoom=zoom.value,
on_zoom=zoom.set,
center=center.value,
on_center=center.set,
scroll_wheel_zoom=True,
layers=[
TileLayer.element(url=url),
# Marker.element(location=marker_location.value, draggable=True, on_location=location_changed),
geo_data.value
]
)
global_map_loaded.set(True)
solara.Select(label="Variable", value=sel_var, values=variables.value, on_value=on_variable_change)
FigureEcharts(option=chart_options.value["line"])
create_data_archive()
print(os.environ["SOLARA_HOST"])
solara.Markdown(f"[**Click here to download output files**](http://{os.environ['SOLARA_HOST']}:8765/static/public/data/{file_shp.value.split('/deposito')[-2].split('/')[-1]}.zip)")
# solara.Markdown(f'<a href=http://localhost:8765/static/public/data/{file_shp.value.split("/deposito")[-2].split("/")[-1]}.zip target="_blank">Download file</a>')
@solara.component
def Page():
with solara.Column(style={"min-width": "500px"}):
content, set_content = solara.use_state(b"")
uploaded, set_uploaded = solara.use_state(b"")
filename, set_filename = solara.use_state("")
river_filename, set_river_filename = solara.use_state("")
q_filename, set_q_filename = solara.use_state("")
error_select_csv, set_error_select_csv = solara.use_state("")
error_select_shp, set_error_select_shp = solara.use_state("")
no_file_selected, set_no_file_selected = solara.use_state("")
# removed_data, set_removed = solara.use_state("")
size, set_size = solara.use_state(0)
file_browser, set_file_browser = solara.use_state(cast(Optional[Path], None))
map_loaded, set_map_loaded = solara.use_state(False)
def on_file(file: FileInfo):
set_filename(file["name"])
set_size(file["size"])
f = file["file_obj"]
set_content(f.read())
set_uploaded(False)
def upload_file():
if content:
file_path = os.path.join("public",filename)
with open(file_path,"wb") as file:
file.write(content)
if filename.endswith(".zip"):
shutil.unpack_archive(os.getcwd()+"/public/"+filename,os.getcwd()+"/public")
os.remove(os.getcwd()+"/public/"+filename)
set_uploaded(True)
def open_river_file():
if file_browser:
file_path = os.path.join(os.getcwd(),file_browser)
if file_path.endswith(".shp"):
set_river_filename(file_path)
set_error_select_shp(False)
set_no_file_selected(False)
else:
set_error_select_shp(True)
else:
set_error_select_shp(True)
def open_q_file():
if file_browser:
file_path = os.path.join(os.getcwd(),file_browser)
if file_path.endswith(".csv"):
set_q_filename(file_path)
set_error_select_csv(False)
else:
set_error_select_csv(True)
else:
set_error_select_csv(True)
def remove_data():
for file in os.listdir(os.getcwd()+"/public"):
os.remove(os.getcwd()+"/public/"+file)
def run_dcascade():
if (river_filename == "" or q_filename == ""):
set_no_file_selected(True)
else:
if ds.value is not None:
ds.value.close()
set_no_file_selected(False)
# sel_form = formulas.index(str(formula).replace("'",""))+1
# sel_part = partitionings.index(str(partitioning).replace("'",""))+1
sel_form = formula_mapping[formula.value]
sel_part = part_mapping[partitioning.value]
# sel_timescale = int(str(timescale).replace("'",""))
sel_timescale = int(timescale.value)
tmp_options = chart_options.value
tmp_options["line"]["xAxis"]["data"] = list(range(1, sel_timescale+1))
chart_options.set(tmp_options)
set_map_loaded(False)
global_map_loaded.set(False)
river = river_filename
q = q_filename
current_time = str(int(time.time()))
os.makedirs(os.getcwd()+"/public/data/"+current_time)
out = f"{os.getcwd()}/public/data/{current_time}/deposito"
out_nc = out+".nc" #".shp"
out_shp = out+".shp"
file_nc.set(out_nc)
file_shp.set(out_shp)
# out_prj = out+".prj"
# shutil.copyfile(river,out_prj)
dcascade_py(river, q, sed_range=sed_range.value, class_size=class_size.value, deposit=deposit.value, timescale=sel_timescale, formula=sel_form, partitioning=sel_part, out=out_nc, version=False, verbose=False, debug=False, credits=False, nc=True)
set_map_loaded(True)
# filename = os.getcwd()+'/public/deposito.nc'
ds.set(xr.open_dataset(out_nc, decode_times=False))
variables.set(list(ds.value.keys()))
sel_var.set(variables.value[0])
ds.value.close()
with solara.Div() as main:
with solara.Card("Dcascade"):
if map_loaded:
MapComponent()
with solara.Card("Upload files"):
solara.Markdown("Required \*.shp, \*.shx, \*.dbf and \*.prj files for **river** file and \*.csv for **q** file. Upload files one at the time or compressed in a *zip* archive.")
with solara.ColumnsResponsive(1, large=6):
solara.FileDrop(
label="Drag and drop a file here ",
on_file=on_file,
lazy=True,
)
solara.Button("Upload file",
color="primary",
on_click=upload_file)
if content and not uploaded:
solara.Info(f"File {filename} has total length: {size}\n",
text=True,
dense=True,
icon=True,
)
# solara.Preformatted("\n".join(textwrap.wrap(repr(content))))
# solara.Preformatted("\n".join(textwrap.wrap(repr(filename))))
elif uploaded:
solara.Success(
f"{filename} uploaded",
text=True,
dense=True,
icon=True,
)
with solara.Card("Select parameters"):
with solara.ColumnsResponsive(3, large=4):
solara.InputText("Timescale (days)", value=timescale, continuous_update=continuous_update.value)
solara.Select(label="Formula", value=formula, values=formulas)
# solara.Markdown(f"**Selected**: {formula.value}")
solara.Select(label="Partitioning ", value=partitioning, values=partitionings)
# solara.Markdown(f"**Selected**: {partitioning.value}")
solara.InputText(label="Sed range ", value=sed_range)
solara.InputFloat(label="Class size ", value=class_size)
solara.InputFloat(label="Deposit ", value=deposit)
with solara.Card("Select files"):
solara.Markdown("Select **river** and **q** files from file explorer below, the uploaded files can be found in /public folder. To select files click on the explorer the click on *SELECT RIVER FILE* or *SELECT Q FILE* based on the file selected.")
with solara.ColumnsResponsive(3, large=6):
solara.Markdown(f"**River file**: {river_filename}")
solara.Button("Select river file",
color="primary",
on_click=open_river_file)
with solara.ColumnsResponsive(3, large=6):
solara.Markdown(f"**Q file**: {q_filename}")
solara.Button("Select Q file",
color="primary",
on_click=open_q_file)
if error_select_csv:
solara.Warning(f"Q file must be csv",
text=True,
dense=True,
icon=True,
)
if error_select_shp:
solara.Warning(f"River file must be shp",
text=True,
dense=True,
icon=True,
)
solara.FileBrowser(
can_select=False,
on_file_open=set_file_browser,
directory="./public"
)
with solara.Card("Run model"):
with solara.ColumnsResponsive(3, large=6):
solara.Markdown("Click on DCASCADE button to run the model and generate the output map")
solara.Button("DCASCADE",
color="primary",
on_click=run_dcascade)
if no_file_selected:
solara.Warning(f"Select both River and Q files",
text=True,
dense=True,
icon=True,
)
# solara.Button("REMOVE DATA",
# color="secondary",
# on_click=remove_data)
# solara.Info(f"You are in directory: {directory}")
# solara.Info(f"You selected path: {path}")
# solara.Info(f"You opened file: {file}")
# if removed_data:
# solara.Success(
# f"Removed data",
# text=True,
# dense=True,
# icon=True,
# )