foxdot / Consumer.py
shoom013's picture
Update Consumer.py
4b4b7ee verified
import asyncio
import requests
import io as i_o
import contextlib
import sys
import concurrent.futures
from FoxDot import *
# Configuration
API_URL = "https://houseofplaying.com/hop/api.php"
POLL_INTERVAL = 1 # Seconds
def change_key(amt = 3, scl="minor", duration=4, flag=True):
Root.default = ((Root.default+amt) % 14)-5
print(f"Key changed to {Root.default}")
Scale.default = scl
if flag:
Clock.future(duration, change_key, args=[amt, scl, duration])
def change_cycle(notes, scales, plen, flag=True):
if (Clock.now() % plen) == 0:
Scale.default = scales[(Clock.now()/plen) % len(notes)]
Root.default = notes[(Clock.now()/plen) % len(notes)]
if flag:
change_cycle(notes, scales, plen)
# Global dictionary to store active tasks and their cancellation tokens (or just the futures)
active_tasks = {}
def poll_server():
try:
response = requests.get(f"{API_URL}?action=poll")
if response.status_code == 200:
return response.json()
except Exception as e:
print(f"Error polling server: {e}")
return None
def update_server(uid, output, status):
try:
data = {'uid': uid, 'output': output, 'status': status}
requests.post(f"{API_URL}?action=update", json=data)
except Exception as e:
print(f"Error updating server: {e}")
def stop(uid):
"""Stops the execution of the cell with the given uid."""
if uid in active_tasks:
print(f"Stopping task {uid}...")
task = active_tasks[uid]
if task != asyncio.current_task():
task.cancel()
return f"Stopped {uid}"
else:
return "Cannot stop itself."
return f"Task {uid} not found or not running."
def stopall():
"""Stops all running executions."""
count = 0
for uid in list(active_tasks.keys()):
if active_tasks[uid] != asyncio.current_task():
stop(uid)
count += 1
try:
# Check if an event loop is already running
try:
oldloop = asyncio.get_running_loop()
print("Starting new event loop...")
loop = asyncio.run(main())
oldloop.stop()
except RuntimeError:
loop = asyncio.run(main())
oldloop.stop()
if oldloop and oldloop.is_running():
print("Event loop already running. Scheduling main task...")
loop.create_task(main())
else:
print("Starting new event loop...")
loop=asyncio.run(main())
loop.run_forever()
except KeyboardInterrupt:
print("Stopping consumer...")
return f"Stopped {count} tasks, killed main loop and restarted"
def execute_code_sync(code, uid):
# Capture stdout
stdout_capture = i_o.StringIO()
status = "completed"
output = ""
# Inject stop functions into locals
local_scope = locals()
local_scope['stop'] = stop
local_scope['stopall'] = stopall
local_scope['change_key'] = change_key
local_scope['change_cycle'] = change_cycle
try:
with contextlib.redirect_stdout(stdout_capture):
# Execute the code
# Note: exec() is dangerous if not sandboxed
exec(code, globals(), local_scope)
output = stdout_capture.getvalue()
except Exception as e:
status = "error"
output = f"{e}"
print(f"An unexpected error occurred in {uid}: {e}")
return output, status
async def process_task(uid, code):
#global loop
#global active_tasks
print(f"Processing {uid}...")
try:
# API: INSERT ... ON DUPLICATE KEY UPDATE. So we CAN update log entries.
# Step 1: Set status to 'wait'
update_server(uid, "Execution started...", "wait")
# Step 2: Run code in thread
loop = asyncio.get_running_loop()
# We wrap the sync execution in a Task to allow cancellation
output, status = await loop.run_in_executor(None, execute_code_sync, code, uid)
# Step 3: Update with final result
update_server(uid, output, status)
except asyncio.CancelledError:
print(f"Task {uid} cancelled.")
update_server(uid, "Execution cancelled.", "error")
except Exception as e:
print(f"Error in task {uid}: {e}")
update_server(uid, str(e), "error")
finally:
if uid in active_tasks:
del active_tasks[uid]
async def main():
print(f"Starting FoxDot Consumer (Async)... Polling {API_URL}")
while True:
# Move polling to thread to avoid blocking loop
task_data = await asyncio.to_thread(poll_server)
if task_data:
uid = task_data.get('uid')
code = task_data.get('code')
print(f"Received task {uid}")
# Create async task
taskold = None
if uid in active_tasks:
taskold = active_tasks[uid]
task = asyncio.create_task(process_task(uid, code))
active_tasks[uid] = task
if taskold and taskold.is_running():
taskold.cancel()
await asyncio.sleep(POLL_INTERVAL)
if __name__ == "__main__":
try:
# Check if an event loop is already running
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
print("Event loop already running. Scheduling main task...")
loop.create_task(main())
# Note: We assume the existing loop will keep running.
# If this is a script that just exits, we might need loop.run_forever() if not already happening.
else:
print("Starting new event loop...")
loop = asyncio.run(main())
loop.run_forever()
except KeyboardInterrupt:
print("Stopping consumer...")