| | from flask import Flask, request, jsonify |
| | import requests |
| | import random |
| | import string |
| |
|
| | app = Flask(__name__) |
| |
|
| | |
| | |
| | |
| | GLOBAL_WORKSPACE_ID = "wkspace_01JVCT7SCHHQBH0PTCQNTF6TYF" |
| | GLOBAL_BOT_ID = "5d89e4f7-c1bd-4238-92c4-e4188e74fd49" |
| | TOKEN = "Bearer bp_pat_vTuxol25N0ymBpYaWqtWpFfGPKt260IfT784" |
| |
|
| | |
| | |
| | |
| | def generate_random_name(length=5): |
| | return ''.join(random.choices(string.ascii_letters, k=length)) |
| |
|
| |
|
| | |
| | |
| | |
| | def create_workspace(): |
| | ws_url = "https://api.botpress.cloud/v1/admin/workspaces" |
| | headers = { |
| | "User-Agent": "Mozilla/5.0", |
| | "Authorization": TOKEN |
| | } |
| | payload = {"name": generate_random_name()} |
| | response = requests.post(ws_url, headers=headers, json=payload) |
| |
|
| | if response.status_code == 200: |
| | response_json = response.json() |
| | return response_json.get('id') |
| | else: |
| | print(f"Workspace creation failed with: {response.status_code}, {response.text}") |
| | return None |
| |
|
| |
|
| | def create_bot(workspace_id): |
| | bot_url = "https://api.botpress.cloud/v1/admin/bots" |
| | headers = { |
| | "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", |
| | "x-workspace-id": workspace_id, |
| | "Authorization": TOKEN |
| | } |
| | payload = {"name": generate_random_name()} |
| | response = requests.post(bot_url, headers=headers, json=payload) |
| |
|
| | if response.status_code == 200: |
| | response_json = response.json() |
| | bot_id = response_json.get("bot", {}).get("id") |
| | if not bot_id: |
| | print("Bot ID not found in the response.") |
| | return bot_id |
| | else: |
| | print(f"Bot creation failed with: {response.status_code}, {response.text}") |
| | return None |
| |
|
| |
|
| | def delete_bot(bot_id, workspace_id): |
| | url = f"https://api.botpress.cloud/v1/admin/bots/{bot_id}" |
| | headers = { |
| | "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", |
| | "x-workspace-id": workspace_id, |
| | "Authorization": TOKEN |
| | } |
| | return requests.delete(url, headers=headers) |
| |
|
| |
|
| | def delete_workspace(workspace_id): |
| | url = f"https://api.botpress.cloud/v1/admin/workspaces/{workspace_id}" |
| | headers = { |
| | "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", |
| | "Authorization": TOKEN |
| | } |
| | return requests.delete(url, headers=headers) |
| |
|
| |
|
| | |
| | |
| | |
| | def chat_with_assistant(user_input, image_url, bot_id, workspace_id): |
| | """ |
| | Sends the user input and chat history to the Botpress GPT-4 endpoint, |
| | returns the assistant's response and (possibly updated) bot/workspace IDs. |
| | """ |
| | |
| | headers = { |
| | "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", |
| | "x-bot-id": bot_id, |
| | "Content-Type": "application/json", |
| | "Authorization": TOKEN |
| | } |
| |
|
| | |
| | payload = { |
| | "prompt": { |
| | "model": "gpt-4o", |
| | "signatureVersion": "Jan-2024", |
| | "max_tokens": 1000, |
| | "temperature": 0.2, |
| | "messages": [ |
| | { |
| | "role": "user", |
| | "content": [ |
| | { |
| | "type": "text", |
| | "text": user_input |
| | } |
| | ] |
| | } |
| | ] |
| | }, |
| | "variables": {}, |
| | "options": { |
| | "origin": "agents/VisionAgent/0.1", |
| | "max_tokens": 100000 |
| | } |
| | } |
| |
|
| | |
| | if image_url: |
| | payload["prompt"]["messages"][0]["content"].append({ |
| | "type": "image_url", |
| | "image_url": { |
| | "url": image_url |
| | } |
| | }) |
| |
|
| | botpress_url = "https://api.botpress.cloud/v1/cognitive/chat-gpt/query" |
| |
|
| | try: |
| | response = requests.post(botpress_url, json=payload, headers=headers) |
| |
|
| | if response.status_code == 200: |
| | data = response.json() |
| | assistant_content = data.get('choices', [{}])[0].get('message', {}).get('content', '') |
| | return assistant_content, bot_id, workspace_id |
| | elif response.status_code == 403: |
| | raise Exception("Invalid or expired bot ID.") |
| | else: |
| | return f"Error {response.status_code}: {response.text}", bot_id, workspace_id |
| | except Exception as e: |
| | if "Invalid or expired bot ID" in str(e): |
| | if bot_id and workspace_id: |
| | delete_bot(bot_id, workspace_id) |
| | delete_workspace(workspace_id) |
| | new_workspace = create_workspace() |
| | new_bot = create_bot(new_workspace) |
| | if not new_workspace or not new_bot: |
| | return "Failed to regenerate workspace or bot IDs.", None, None |
| | headers["x-bot-id"] = new_bot |
| | retry_response = requests.post(botpress_url, json=payload, headers=headers) |
| | if retry_response.status_code == 200: |
| | data = retry_response.json() |
| | assistant_content = data.get('choices', [{}])[0].get('message', {}).get('content', '') |
| | return assistant_content, new_bot, new_workspace |
| | else: |
| | return f"Error {retry_response.status_code}: {retry_response.text}", new_bot, new_workspace |
| | else: |
| | return f"Unexpected error: {str(e)}", bot_id, workspace_id |
| |
|
| |
|
| | |
| | @app.route("/image", methods=["POST"]) |
| | def chat_endpoint(): |
| | """ |
| | Expects JSON with: |
| | { |
| | "user_input": "string", |
| | "image_url": "string" # Optional: URL of the image |
| | } |
| | Returns JSON with: |
| | { |
| | "assistant_response": "string" |
| | } |
| | """ |
| | global GLOBAL_WORKSPACE_ID, GLOBAL_BOT_ID |
| |
|
| | |
| | data = request.get_json(force=True) |
| | user_input = data.get("user_input", "") |
| | image_url = data.get("image_url", "") |
| |
|
| | |
| | if not GLOBAL_WORKSPACE_ID or not GLOBAL_BOT_ID: |
| | GLOBAL_WORKSPACE_ID = create_workspace() |
| | GLOBAL_BOT_ID = create_bot(GLOBAL_WORKSPACE_ID) |
| | if not GLOBAL_WORKSPACE_ID or not GLOBAL_BOT_ID: |
| | return jsonify({"assistant_response": "Could not create workspace or bot."}), 500 |
| |
|
| | |
| | assistant_response, updated_bot_id, updated_workspace_id = chat_with_assistant( |
| | user_input, |
| | image_url, |
| | GLOBAL_BOT_ID, |
| | GLOBAL_WORKSPACE_ID |
| | ) |
| |
|
| | |
| | GLOBAL_BOT_ID = updated_bot_id |
| | GLOBAL_WORKSPACE_ID = updated_workspace_id |
| |
|
| | return jsonify({"assistant_response": assistant_response}) |
| |
|
| |
|
| | |
| | if __name__ == "__main__": |
| | app.run(host="0.0.0.0", port=7860, debug=True) |