CloudCall / streamlit_app.py
vincentiusyoshuac's picture
Update streamlit_app.py
17ea9fc verified
import streamlit as st
from streamlit_webrtc import VideoProcessorBase, webrtc_streamer
import uuid
import requests
import json
from typing import Dict, Optional
import time
class VideoProcessor(VideoProcessorBase):
def recv(self, frame):
img = frame.to_ndarray(format="bgr24")
return img
class WebRTCConnection:
def __init__(self):
self.server_url = "http://localhost:7860"
self.peer_id: Optional[str] = None
def register_peer(self, peer_id: str) -> Dict:
try:
response = requests.post(
f"{self.server_url}/register_peer",
json={"peer_id": peer_id},
timeout=5 # Add timeout
)
return response.json()
except requests.exceptions.RequestException as e:
st.error(f"Connection error: {str(e)}")
return {"status": "error", "message": str(e)}
def send_offer(self, peer_id: str, offer: Dict) -> Dict:
try:
response = requests.post(
f"{self.server_url}/send_offer",
json={"peer_id": peer_id, "offer": offer},
timeout=5
)
return response.json()
except requests.exceptions.RequestException as e:
st.error(f"Connection error: {str(e)}")
return {"status": "error", "message": str(e)}
def get_answer(self, peer_id: str) -> Dict:
try:
response = requests.get(
f"{self.server_url}/get_answer",
params={"peer_id": peer_id},
timeout=5
)
return response.json()
except requests.exceptions.RequestException as e:
st.error(f"Connection error: {str(e)}")
return {"status": "error", "message": str(e)}
def create_video_interface():
st.set_page_config(
page_title="Video Call",
page_icon="πŸŽ₯",
layout="wide",
initial_sidebar_state="collapsed"
)
# Initialize session state
if 'connection' not in st.session_state:
st.session_state.connection = WebRTCConnection()
if 'status' not in st.session_state:
st.session_state.status = "Initializing..."
if 'peer_id' not in st.session_state:
st.session_state.peer_id = None
st.title("πŸ“± Android Video Call")
# Create two columns for better mobile layout
col1, col2 = st.columns(2)
with col1:
peer_id_option = st.radio(
"Peer ID Option",
("Generate", "Enter"),
key="peer_id_option"
)
if peer_id_option == "Generate":
st.session_state.peer_id = str(uuid.uuid4())[:8]
st.code(st.session_state.peer_id, language=None)
else:
st.session_state.peer_id = st.text_input(
"Enter Peer ID",
key="peer_id_input",
max_chars=8,
placeholder="Enter 8-character ID"
)
with col2:
st.metric(
label="Status",
value=st.session_state.status
)
if st.button("Connect", key="connect_button", use_container_width=True):
if st.session_state.peer_id:
response = st.session_state.connection.register_peer(
st.session_state.peer_id
)
if response.get("status") == "registered":
st.session_state.status = "Connected"
st.rerun() # Using st.rerun() instead of experimental_rerun
else:
st.error("Please enter or generate a Peer ID")
# Video container with responsive layout
st.markdown("""
<style>
.video-container {
position: relative;
width: 100%;
padding-bottom: 56.25%;
height: 0;
overflow: hidden;
}
.video-container iframe {
position: absolute;
top: 0;
left: 0;
width: 100%;
height: 100%;
}
</style>
""", unsafe_allow_html=True)
if st.session_state.status == "Connected":
with st.container():
st.markdown("<div class='video-container'>", unsafe_allow_html=True)
webrtc_streamer(
key="local",
video_processor_factory=VideoProcessor,
rtc_configuration={
"iceServers": [
{"urls": ["stun:stun.l.google.com:19302"]},
{
"urls": ["turn:relay.metered.ca:80"],
"username": "openrelayproject",
"credential": "openrelayproject"
}
]
},
media_stream_constraints={
"video": {
"width": {"ideal": 640},
"height": {"ideal": 480},
"frameRate": {"max": 30}
},
"audio": True
}
)
st.markdown("</div>", unsafe_allow_html=True)
if st.button("End Call", key="end_call", use_container_width=True):
st.session_state.status = "Call ended"
st.rerun() # Using st.rerun() instead of experimental_rerun
if __name__ == "__main__":
create_video_interface()