wa-api-v2 / examples /python /whatsapp_echo_bot.py
baagas0's picture
Fix port
fc7a493
raw
history blame contribute delete
No virus
3.25 kB
import random
from pprint import pprint
from time import sleep
import requests
from flask import Flask
from flask import request
app = Flask(__name__)
def send_message(chat_id, text):
"""
Send message to chat_id.
:param chat_id: Phone number + "@c.us" suffix - 1231231231@c.us
:param text: Message for the recipient
"""
# Send a text back via WhatsApp HTTP API
response = requests.post(
"http://localhost:7860/api/sendText",
json={
"chatId": chat_id,
"text": text,
"session": "default",
},
)
response.raise_for_status()
def reply(chat_id, message_id, text):
response = requests.post(
"http://localhost:7860/api/reply",
json={
"chatId": chat_id,
"text": text,
"reply_to": message_id,
"session": "default",
},
)
response.raise_for_status()
def send_seen(chat_id, message_id, participant):
response = requests.post(
"http://localhost:7860/api/sendSeen",
json={
"session": "default",
"chatId": chat_id,
"messageId": message_id,
"participant": participant,
},
)
response.raise_for_status()
def start_typing(chat_id):
response = requests.post(
"http://localhost:7860/api/startTyping",
json={
"session": "default",
"chatId": chat_id,
},
)
response.raise_for_status()
def stop_typing(chat_id):
response = requests.post(
"http://localhost:7860/api/stopTyping",
json={
"session": "default",
"chatId": chat_id,
},
)
response.raise_for_status()
def typing(chat_id, seconds):
start_typing(chat_id=chat_id)
sleep(seconds)
stop_typing(chat_id=chat_id)
@app.route("/")
def whatsapp_echo():
return "WhatsApp Echo Bot is ready!"
@app.route("/bot", methods=["GET", "POST"])
def whatsapp_webhook():
if request.method == "GET":
return "WhatsApp Echo Bot is ready!"
data = request.get_json()
pprint(data)
if data["event"] != "message":
# We can't process other event yet
return f"Unknown event {data['event']}"
# Payload that we've got
payload = data["payload"]
# The text
text = payload.get("body")
if not text:
# We can't process non-text messages yet
print("No text in message")
print(payload)
return "OK"
# Number in format 1231231231@c.us or @g.us for group
chat_id = payload["from"]
# Message ID - false_11111111111@c.us_AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
message_id = payload['id']
# For groups - who sent the message
participant = payload.get('participant')
# IMPORTANT - Always send seen before sending new message
send_seen(chat_id=chat_id, message_id=message_id, participant=participant)
# Send a text back via WhatsApp HTTP API
typing(chat_id=chat_id, seconds=random.random() * 3)
send_message(chat_id=chat_id, text=text)
# OR reply on the message
typing(chat_id=chat_id, seconds=random.random() * 3)
reply(chat_id=chat_id, message_id=message_id, text=text)
# Send OK back
return "OK"