| import os |
| import sys |
| import io |
| import re |
| import atexit |
| import threading |
| import time |
| import logging |
| from collections import deque |
| from pathlib import Path |
| from flask import Flask, request, jsonify, render_template_string, send_file, send_from_directory |
|
|
| from settings_manager import SettingsManager |
| from autoblow_controller import AutoblowController as HandyController |
| from llm_service import LLMService |
| from audio_service import AudioService |
| from background_modes import AutoModeThread, auto_mode_logic, milking_mode_logic, edging_mode_logic |
| from enhanced_background_modes import EnhancedModeRunner |
|
|
| |
| app = Flask(__name__) |
| LLM_URL = os.getenv("OLLAMA_URL", "http://127.0.0.1:11434/api/chat") |
| settings = SettingsManager(settings_file_path="my_settings.json") |
| settings.load() |
|
|
| handy = HandyController(settings.handy_key) |
| handy.update_settings(settings.min_speed, settings.max_speed, settings.min_depth, settings.max_depth) |
|
|
| llm = LLMService(url=LLM_URL) |
| audio = AudioService() |
| if settings.elevenlabs_api_key: |
| if audio.set_api_key(settings.elevenlabs_api_key): |
| audio.fetch_available_voices() |
| audio.configure_voice(settings.elevenlabs_voice_id, True) |
|
|
| |
| chat_history = deque(maxlen=20) |
| messages_for_ui = deque() |
| auto_mode_active_task = None |
| current_mood = "Curious" |
| use_long_term_memory = True |
| calibration_pos_mm = 0.0 |
| user_signal_event = threading.Event() |
| mode_message_queue = deque(maxlen=5) |
| edging_start_time = None |
|
|
| |
| pattern_mode_runner = EnhancedModeRunner(handy) |
|
|
| |
| special_persona_mode = None |
| special_persona_interactions_left = 0 |
|
|
| SNAKE_ASCII = """ |
| ⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⠿⠟⠛⠛⠋⠉⠛⠟⢿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿ |
| ⣿⣿⣿⣿⣿⣿⣿⡏⠉⠹⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠘⣿⣿⣿⣿⣿⣿⣿⣿⣿ |
| ⣿⣿⣿⣿⣿⣿⣿⣿⠀⢸⣧⡀⠀⠰⣦⡀⠀⠀⢀⠀⠀⠈⣻⣿⣿⣿⣿⣿⣿⣿ |
| ⣿⣿⣿⣿⣿⣿⣿⣿⡇⢨⣿⣿⣖⡀⢡⠉⠄⣀⢀⣀⡀⠀⠼⣿⣿⣿⣿⣿⣿⣿ |
| ⣿⣿⣿⣿⣿⣿⣿⣿⠀⠀⠘⠋⢏⢀⣰⣖⣿⣿⣿⠟⡡⠀⣿⣿⣿⣿⣿⣿⣿⣿ |
| ⣿⣿⣿⣿⣿⣿⣯⠁⢀⠂⡆⠉⠘⠛⠿⣿⢿⠟⢁⣬⡶⢠⣿⣿⣿⣿⣿⣿⣿⣿ |
| ⣿⣿⣿⣿⣿⣿⡯⠀⢀⡀⠝⠀⠀⠀⠀⢀⠠⣩⣤⣠⣆⣾⣿⣿⣿⣿⣿⣿⣿⣿ |
| ⣿⣿⣿⣿⣿⣿⡅⠀⠊⠇⢈⣴⣦⣤⣆⠈⢀⠋⠹⣿⣇⣻⣿⣿⣿⣿⣿⣿⣿⣿ |
| ⣿⣿⣿⣿⣿⣿⣿⡄⠥⡇⠀⠀⠚⠺⠯⠀⠀⠒⠛⠒⢪⢿⣿⣿⣿⣿⣿⣿⣿⣿ |
| ⣿⣿⣿⡿⠿⠛⠋⠀⠘⣿⡄⠀⠀⠀⠋⠉⡉⠙⠂⢰⣾⣿⣿⣿⣿⣿⣿⣿⣿⣿ |
| ⠀⠈⠉⠀⠀⠀⠀⠀⠀⠀⠙⠷⢐⠀⠀⠀⠀⢀⢴⣿⠊⠀⠉⠉⠉⠈⠙⠉⠛⠿ |
| ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⠉⠰⣖⣴⣾⡃⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀ |
| ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠁⠀⠀⠀⠀⠀⢀⠀⠀⠀⠀⠁⠀⠨ |
| """ |
|
|
| |
| STOP_COMMANDS = {"stop", "hold", "halt", "pause", "freeze", "wait"} |
| AUTO_ON_WORDS = {"take over", "you drive", "auto mode"} |
| AUTO_OFF_WORDS = {"manual", "my turn", "stop auto"} |
| MILKING_CUES = {"i'm close", "make me cum", "finish me"} |
| EDGING_CUES = {"edge me", "start edging", "tease and deny"} |
|
|
| |
|
|
| def get_current_context(): |
| global edging_start_time, special_persona_mode |
| context = { |
| 'persona_desc': settings.persona_desc, 'current_mood': current_mood, |
| 'user_profile': settings.user_profile, 'patterns': settings.patterns, |
| 'rules': settings.rules, 'last_stroke_speed': handy.last_relative_speed, |
| 'last_depth_pos': handy.last_depth_pos, 'use_long_term_memory': use_long_term_memory, |
| 'edging_elapsed_time': None, 'special_persona_mode': special_persona_mode |
| } |
| if edging_start_time: |
| elapsed_seconds = int(time.time() - edging_start_time) |
| minutes, seconds = divmod(elapsed_seconds, 60) |
| hours, minutes = divmod(minutes, 60) |
| if hours > 0: |
| context['edging_elapsed_time'] = f"{hours}h {minutes}m {seconds}s" |
| else: |
| context['edging_elapsed_time'] = f"{minutes}m {seconds}s" |
| return context |
|
|
| def add_message_to_queue(text, add_to_history=True): |
| messages_for_ui.append(text) |
| if add_to_history: |
| clean_text = re.sub(r'<[^>]+>', '', text).strip() |
| if clean_text: chat_history.append({"role": "assistant", "content": clean_text}) |
| threading.Thread(target=audio.generate_audio_for_text, args=(text,)).start() |
|
|
| def start_background_mode(mode_logic, initial_message, mode_name): |
| global auto_mode_active_task, edging_start_time |
| if auto_mode_active_task: |
| auto_mode_active_task.stop() |
| auto_mode_active_task.join(timeout=5) |
| |
| user_signal_event.clear() |
| mode_message_queue.clear() |
| if mode_name == 'edging': |
| edging_start_time = time.time() |
| |
| def on_stop(): |
| global auto_mode_active_task, edging_start_time |
| auto_mode_active_task = None |
| edging_start_time = None |
|
|
| def update_mood(m): global current_mood; current_mood = m |
| def get_timings(n): |
| return { |
| 'auto': (settings.auto_min_time, settings.auto_max_time), |
| 'milking': (settings.milking_min_time, settings.milking_max_time), |
| 'edging': (settings.edging_min_time, settings.edging_max_time) |
| }.get(n, (3, 5)) |
|
|
| services = {'llm': llm, 'handy': handy} |
| callbacks = { |
| 'send_message': add_message_to_queue, 'get_context': get_current_context, |
| 'get_timings': get_timings, 'on_stop': on_stop, 'update_mood': update_mood, |
| 'user_signal_event': user_signal_event, |
| 'message_queue': mode_message_queue |
| } |
| auto_mode_active_task = AutoModeThread(mode_logic, initial_message, services, callbacks, mode_name=mode_name) |
| auto_mode_active_task.start() |
|
|
| |
| @app.route('/') |
| def home_page(): |
| base_path = sys._MEIPASS if getattr(sys, 'frozen', False) else os.path.dirname(os.path.abspath(__file__)) |
| with open(os.path.join(base_path, 'index.html'), 'r', encoding='utf-8') as f: |
| return render_template_string(f.read()) |
|
|
| @app.route('/static/<path:path>') |
| def send_static(path): |
| static_dir = os.path.join(os.path.dirname(__file__), 'static') |
| return send_from_directory(static_dir, path) |
|
|
| def _konami_code_action(): |
| def pattern_thread(): |
| handy.move(speed=100, depth=50, span=100) |
| time.sleep(5) |
| handy.stop() |
| threading.Thread(target=pattern_thread).start() |
| message = f"Kept you waiting, huh?<pre>{SNAKE_ASCII}</pre>" |
| add_message_to_queue(message) |
|
|
| def _handle_chat_commands(text): |
| if any(cmd in text for cmd in STOP_COMMANDS): |
| if auto_mode_active_task: auto_mode_active_task.stop() |
| handy.stop() |
| add_message_to_queue("Stopping.", add_to_history=False) |
| return True, jsonify({"status": "stopped"}) |
| if "up up down down left right left right b a" in text: |
| _konami_code_action() |
| return True, jsonify({"status": "konami_code_activated"}) |
| if any(cmd in text for cmd in AUTO_ON_WORDS) and not auto_mode_active_task: |
| start_background_mode(auto_mode_logic, "Okay, I'll take over...", mode_name='auto') |
| return True, jsonify({"status": "auto_started"}) |
| if any(cmd in text for cmd in AUTO_OFF_WORDS) and auto_mode_active_task: |
| auto_mode_active_task.stop() |
| return True, jsonify({"status": "auto_stopped"}) |
| if any(cmd in text for cmd in EDGING_CUES): |
| start_background_mode(edging_mode_logic, "Let's play an edging game...", mode_name='edging') |
| return True, jsonify({"status": "edging_started"}) |
| if any(cmd in text for cmd in MILKING_CUES): |
| start_background_mode(milking_mode_logic, "You're so close... I'm taking over completely now.", mode_name='milking') |
| return True, jsonify({"status": "milking_started"}) |
| return False, None |
|
|
| @app.route('/send_message', methods=['POST']) |
| def handle_user_message(): |
| global special_persona_mode, special_persona_interactions_left |
| data = request.json |
| user_input = data.get('message', '').strip() |
|
|
| if (p := data.get('persona_desc')) and p != settings.persona_desc: |
| settings.persona_desc = p; settings.save() |
| if (k := data.get('key')) and k != settings.handy_key: |
| handy.set_api_key(k); settings.handy_key = k; settings.save() |
| |
| if not handy.handy_key: return jsonify({"status": "no_key_set"}) |
| if not user_input: return jsonify({"status": "empty_message"}) |
|
|
| chat_history.append({"role": "user", "content": user_input}) |
| |
| handled, response = _handle_chat_commands(user_input.lower()) |
| if handled: return response |
|
|
| if auto_mode_active_task: |
| mode_message_queue.append(user_input) |
| return jsonify({"status": "message_relayed_to_active_mode"}) |
| |
| llm_response = llm.get_chat_response(chat_history, get_current_context()) |
| |
| if special_persona_mode is not None: |
| special_persona_interactions_left -= 1 |
| if special_persona_interactions_left <= 0: |
| special_persona_mode = None |
| add_message_to_queue("(Personality core reverted to standard operation.)", add_to_history=False) |
|
|
| if chat_text := llm_response.get("chat"): add_message_to_queue(chat_text) |
| if new_mood := llm_response.get("new_mood"): global current_mood; current_mood = new_mood |
| if not auto_mode_active_task and (move := llm_response.get("move")): |
| handy.move(move.get("sp"), move.get("dp"), move.get("rng")) |
| return jsonify({"status": "ok"}) |
|
|
| @app.route('/check_settings') |
| def check_settings_route(): |
| if settings.handy_key and settings.min_depth < settings.max_depth: |
| return jsonify({ |
| "configured": True, "persona": settings.persona_desc, "handy_key": settings.handy_key, |
| "ai_name": settings.ai_name, "elevenlabs_key": settings.elevenlabs_api_key, |
| "pfp": settings.profile_picture_b64, |
| "timings": { "auto_min": settings.auto_min_time, "auto_max": settings.auto_max_time, "milking_min": settings.milking_min_time, "milking_max": settings.milking_max_time, "edging_min": settings.edging_min_time, "edging_max": settings.edging_max_time } |
| }) |
| return jsonify({"configured": False}) |
|
|
| @app.route('/set_ai_name', methods=['POST']) |
| def set_ai_name_route(): |
| global special_persona_mode, special_persona_interactions_left |
| name = request.json.get('name', 'BOT').strip(); |
| if not name: name = 'BOT' |
| |
| if name.lower() == 'glados': |
| special_persona_mode = "GLaDOS" |
| special_persona_interactions_left = 5 |
| settings.ai_name = "GLaDOS" |
| settings.save() |
| return jsonify({"status": "special_persona_activated", "persona": "GLaDOS", "message": "Oh, it's *you*."}) |
|
|
| settings.ai_name = name; settings.save() |
| return jsonify({"status": "success", "name": name}) |
|
|
| @app.route('/signal_edge', methods=['POST']) |
| def signal_edge_route(): |
| if auto_mode_active_task and auto_mode_active_task.name == 'edging': |
| user_signal_event.set() |
| return jsonify({"status": "signaled"}) |
| return jsonify({"status": "ignored", "message": "Edging mode not active."}), 400 |
|
|
| @app.route('/set_profile_picture', methods=['POST']) |
| def set_pfp_route(): |
| b64_data = request.json.get('pfp_b64') |
| if not b64_data: return jsonify({"status": "error", "message": "Missing image data"}), 400 |
| settings.profile_picture_b64 = b64_data; settings.save() |
| return jsonify({"status": "success"}) |
|
|
| @app.route('/set_handy_key', methods=['POST']) |
| def set_handy_key_route(): |
| key = request.json.get('key') |
| if not key: return jsonify({"status": "error", "message": "Key is missing"}), 400 |
| handy.set_api_key(key); settings.handy_key = key; settings.save() |
| return jsonify({"status": "success"}) |
|
|
| @app.route('/nudge', methods=['POST']) |
| def nudge_route(): |
| global calibration_pos_mm |
| if calibration_pos_mm == 0.0 and (pos := handy.get_position_mm()): |
| calibration_pos_mm = pos |
| direction = request.json.get('direction') |
| calibration_pos_mm = handy.nudge(direction, 0, 100, calibration_pos_mm) |
| return jsonify({"status": "ok", "depth_percent": handy.mm_to_percent(calibration_pos_mm)}) |
|
|
| @app.route('/setup_elevenlabs', methods=['POST']) |
| def elevenlabs_setup_route(): |
| data = request.json |
| api_key = data.get('api_key') |
| voice_id = data.get('voice_id') |
| enabled = data.get('enabled', False) |
| |
| if api_key: |
| if not audio.set_api_key(api_key): |
| return jsonify({"status": "error", "message": "Invalid API key"}), 400 |
| settings.elevenlabs_api_key = api_key |
| |
| voices_result = audio.fetch_available_voices() |
| if voices_result.get("status") == "error": |
| return jsonify(voices_result), 400 |
| |
| success, message = audio.configure_voice(voice_id, enabled) |
| if not success: |
| return jsonify({"status": "error", "message": message}), 400 |
| |
| settings.elevenlabs_voice_id = voice_id if enabled else "" |
| settings.save() |
| |
| return jsonify({"status": "success", "voices": audio.available_voices}) |
|
|
| @app.route('/get_audio', methods=['GET']) |
| def get_audio_route(): |
| audio_data = audio.get_next_audio_chunk() |
| if audio_data: |
| return send_file(io.BytesIO(audio_data), mimetype="audio/mpeg", as_attachment=False) |
| return jsonify({"status": "no_audio"}), 204 |
|
|
| @app.route('/poll_messages', methods=['GET']) |
| def poll_messages_route(): |
| global pattern_mode_runner |
| |
| if messages_for_ui: |
| messages = list(messages_for_ui) |
| messages_for_ui.clear() |
| return jsonify({ |
| "messages": messages, |
| "mood": current_mood, |
| "auto_active": bool(auto_mode_active_task) or pattern_mode_runner.is_running(), |
| "auto_mode_name": auto_mode_active_task.name if auto_mode_active_task else ("pattern" if pattern_mode_runner.is_running() else None), |
| "edging_elapsed": int(time.time() - edging_start_time) if edging_start_time else None |
| }) |
| return jsonify({ |
| "messages": [], |
| "mood": current_mood, |
| "auto_active": bool(auto_mode_active_task) or pattern_mode_runner.is_running(), |
| "auto_mode_name": auto_mode_active_task.name if auto_mode_active_task else ("pattern" if pattern_mode_runner.is_running() else None), |
| "edging_elapsed": int(time.time() - edging_start_time) if edging_start_time else None |
| }) |
|
|
| @app.route('/save_timings', methods=['POST']) |
| def save_timings_route(): |
| data = request.json |
| settings.auto_min_time = float(data.get('auto_min_time', 4.0)) |
| settings.auto_max_time = float(data.get('auto_max_time', 7.0)) |
| settings.milking_min_time = float(data.get('milking_min_time', 2.5)) |
| settings.milking_max_time = float(data.get('milking_max_time', 4.5)) |
| settings.edging_min_time = float(data.get('edging_min_time', 5.0)) |
| settings.edging_max_time = float(data.get('edging_max_time', 8.0)) |
| settings.save() |
| return jsonify({"status": "success"}) |
|
|
| @app.route('/sidebar_mode_action', methods=['POST']) |
| def sidebar_mode_action_route(): |
| global pattern_mode_runner |
| action = request.json.get('action') |
| use_patterns = request.json.get('use_patterns', False) |
| |
| if action == 'auto': |
| if auto_mode_active_task: |
| auto_mode_active_task.stop() |
| pattern_mode_runner.stop_current_mode() |
| return jsonify({"status": "auto_stopped"}) |
| else: |
| if use_patterns and pattern_mode_runner.start_pattern_mode('auto'): |
| add_message_to_queue("Starting auto pattern mode...", add_to_history=False) |
| return jsonify({"status": "pattern_auto_started"}) |
| else: |
| start_background_mode(auto_mode_logic, "Taking control now...", mode_name='auto') |
| return jsonify({"status": "auto_started"}) |
| |
| elif action == 'edging': |
| pattern_mode_runner.stop_current_mode() |
| if use_patterns and pattern_mode_runner.start_pattern_mode('edge'): |
| add_message_to_queue("Starting edging pattern mode...", add_to_history=False) |
| return jsonify({"status": "pattern_edging_started"}) |
| else: |
| start_background_mode(edging_mode_logic, "Let's begin edging...", mode_name='edging') |
| return jsonify({"status": "edging_started"}) |
| |
| elif action == 'milking': |
| pattern_mode_runner.stop_current_mode() |
| if use_patterns and pattern_mode_runner.start_pattern_mode('orgasm'): |
| add_message_to_queue("Starting milking pattern mode...", add_to_history=False) |
| return jsonify({"status": "pattern_milking_started"}) |
| else: |
| start_background_mode(milking_mode_logic, "Milking time...", mode_name='milking') |
| return jsonify({"status": "milking_started"}) |
| |
| elif action == 'stop': |
| if auto_mode_active_task: |
| auto_mode_active_task.stop() |
| pattern_mode_runner.stop_current_mode() |
| handy.stop() |
| return jsonify({"status": "stopped"}) |
| |
| return jsonify({"status": "unknown_action"}), 400 |
|
|
| def cleanup(): |
| """Clean up resources when the app shuts down""" |
| global auto_mode_active_task, pattern_mode_runner |
| if auto_mode_active_task: |
| auto_mode_active_task.stop() |
| auto_mode_active_task.join(timeout=5) |
| pattern_mode_runner.stop_current_mode() |
| handy.stop() |
| settings.save(llm, chat_history) |
|
|
| atexit.register(cleanup) |
|
|
| if __name__ == '__main__': |
| logging.basicConfig(level=logging.DEBUG) |
| app.secret_key = os.environ.get("SESSION_SECRET", "dev-secret-key-change-in-production") |
| app.run(host='0.0.0.0', port=5000, debug=True) |
|
|