Spaces:
Runtime error
Runtime error
from typing import Optional, Iterable | |
import streamlit as st | |
from streamlit import cursor | |
from streamlit.report_thread import get_report_ctx as get_script_run_ctx | |
#from streamlit.script_run_context import get_script_run_ctx | |
from streamlit.errors import NoSessionContext | |
from streamlit.proto import Block_pb2 | |
from streamlit.proto import ForwardMsg_pb2 | |
from streamlit.elements.form import FormData, current_form_id | |
from streamlit.delta_generator import DeltaGenerator | |
def _block(self, block_proto=Block_pb2.Block()) -> "DeltaGenerator": | |
# Operate on the active DeltaGenerator, in case we're in a `with` block. | |
dg = self._active_dg | |
# Prevent nested columns & expanders by checking all parents. | |
block_type = block_proto.WhichOneof("type") | |
# Convert the generator to a list, so we can use it multiple times. | |
parent_block_types = frozenset(dg._parent_block_types) | |
# if block_type == "column" and block_type in parent_block_types: | |
# raise StreamlitAPIException("Columns may not be nested inside other columns.") | |
# if block_type == "expandable" and block_type in parent_block_types: | |
# raise StreamlitAPIException( | |
# "Expanders may not be nested inside other expanders." | |
# ) | |
if dg._root_container is None or dg._cursor is None: | |
return dg | |
msg = ForwardMsg_pb2.ForwardMsg() | |
msg.metadata.delta_path[:] = dg._cursor.delta_path | |
msg.delta.add_block.CopyFrom(block_proto) | |
# Normally we'd return a new DeltaGenerator that uses the locked cursor | |
# below. But in this case we want to return a DeltaGenerator that uses | |
# a brand new cursor for this new block we're creating. | |
block_cursor = cursor.RunningCursor( | |
root_container=dg._root_container, | |
parent_path=dg._cursor.parent_path + (dg._cursor.index,), | |
) | |
block_dg = DeltaGenerator( | |
root_container=dg._root_container, | |
cursor=block_cursor, | |
parent=dg, | |
block_type=block_type, | |
) | |
# Blocks inherit their parent form ids. | |
# NOTE: Container form ids aren't set in proto. | |
block_dg._form_data = FormData(current_form_id(dg)) | |
# Must be called to increment this cursor's index. | |
dg._cursor.get_locked_cursor(last_index=None) | |
_enqueue_message(msg) | |
return block_dg | |
def _enqueue_message(msg): | |
"""Enqueues a ForwardMsg proto to send to the app.""" | |
ctx = get_script_run_ctx() | |
if ctx is None: | |
raise NoSessionContext() | |
ctx.enqueue(msg) | |
DeltaGenerator._block = _block | |