|
import random |
|
from pprint import pprint |
|
from time import sleep |
|
|
|
import requests |
|
from flask import Flask |
|
from flask import request |
|
|
|
app = Flask(__name__) |
|
|
|
|
|
def send_message(chat_id, text): |
|
""" |
|
Send message to chat_id. |
|
:param chat_id: Phone number + "@c.us" suffix - 1231231231@c.us |
|
:param text: Message for the recipient |
|
""" |
|
|
|
response = requests.post( |
|
"http://localhost:7860/api/sendText", |
|
json={ |
|
"chatId": chat_id, |
|
"text": text, |
|
"session": "default", |
|
}, |
|
) |
|
response.raise_for_status() |
|
|
|
def reply(chat_id, message_id, text): |
|
response = requests.post( |
|
"http://localhost:7860/api/reply", |
|
json={ |
|
"chatId": chat_id, |
|
"text": text, |
|
"reply_to": message_id, |
|
"session": "default", |
|
}, |
|
) |
|
response.raise_for_status() |
|
|
|
|
|
def send_seen(chat_id, message_id, participant): |
|
response = requests.post( |
|
"http://localhost:7860/api/sendSeen", |
|
json={ |
|
"session": "default", |
|
"chatId": chat_id, |
|
"messageId": message_id, |
|
"participant": participant, |
|
}, |
|
) |
|
response.raise_for_status() |
|
|
|
def start_typing(chat_id): |
|
response = requests.post( |
|
"http://localhost:7860/api/startTyping", |
|
json={ |
|
"session": "default", |
|
"chatId": chat_id, |
|
}, |
|
) |
|
response.raise_for_status() |
|
|
|
def stop_typing(chat_id): |
|
response = requests.post( |
|
"http://localhost:7860/api/stopTyping", |
|
json={ |
|
"session": "default", |
|
"chatId": chat_id, |
|
}, |
|
) |
|
response.raise_for_status() |
|
|
|
def typing(chat_id, seconds): |
|
start_typing(chat_id=chat_id) |
|
sleep(seconds) |
|
stop_typing(chat_id=chat_id) |
|
|
|
@app.route("/") |
|
def whatsapp_echo(): |
|
return "WhatsApp Echo Bot is ready!" |
|
|
|
|
|
@app.route("/bot", methods=["GET", "POST"]) |
|
def whatsapp_webhook(): |
|
if request.method == "GET": |
|
return "WhatsApp Echo Bot is ready!" |
|
|
|
data = request.get_json() |
|
pprint(data) |
|
if data["event"] != "message": |
|
|
|
return f"Unknown event {data['event']}" |
|
|
|
|
|
payload = data["payload"] |
|
|
|
text = payload.get("body") |
|
if not text: |
|
|
|
print("No text in message") |
|
print(payload) |
|
return "OK" |
|
|
|
chat_id = payload["from"] |
|
|
|
message_id = payload['id'] |
|
|
|
participant = payload.get('participant') |
|
|
|
send_seen(chat_id=chat_id, message_id=message_id, participant=participant) |
|
|
|
|
|
|
|
typing(chat_id=chat_id, seconds=random.random() * 3) |
|
send_message(chat_id=chat_id, text=text) |
|
|
|
|
|
typing(chat_id=chat_id, seconds=random.random() * 3) |
|
reply(chat_id=chat_id, message_id=message_id, text=text) |
|
|
|
|
|
return "OK" |
|
|