Spaces:
Build error
Build error
Validify-testbot-1
/
botbuilder-python
/libraries
/botbuilder-adapters-slack
/tests
/test_slack_client.py
# Copyright (c) Microsoft Corporation. All rights reserved. | |
# Licensed under the MIT License. | |
import hashlib | |
import hmac | |
import json | |
import os | |
import uuid | |
import datetime | |
import time | |
import aiounittest | |
import requests | |
import pytest | |
SKIP = os.getenv("SlackChannel") == "" | |
class SlackClient(aiounittest.AsyncTestCase): | |
async def test_send_and_receive_slack_message(self): | |
# Arrange | |
echo_guid = str(uuid.uuid4()) | |
# Act | |
await self._send_message_async(echo_guid) | |
response = await self._receive_message_async() | |
# Assert | |
self.assertEqual(f"Echo: {echo_guid}", response) | |
async def _receive_message_async(self) -> str: | |
last_message = "" | |
i = 0 | |
while "Echo" not in last_message and i < 60: | |
url = ( | |
f"{self._slack_url_base}/conversations.history?token=" | |
f"{self._slack_bot_token}&channel={self._slack_channel}" | |
) | |
response = requests.get( | |
url, | |
) | |
last_message = response.json()["messages"][0]["text"] | |
time.sleep(1) | |
i += 1 | |
return last_message | |
async def _send_message_async(self, echo_guid: str) -> None: | |
timestamp = str(int(datetime.datetime.utcnow().timestamp())) | |
message = self._create_message(echo_guid) | |
hub_signature = self._create_hub_signature(message, timestamp) | |
headers = { | |
"X-Slack-Request-Timestamp": timestamp, | |
"X-Slack-Signature": hub_signature, | |
"Content-type": "application/json", | |
} | |
url = f"https://{self._bot_name}.azurewebsites.net/api/messages" | |
requests.post(url, headers=headers, data=message) | |
def _create_message(self, echo_guid: str) -> str: | |
slack_event = { | |
"client_msg_id": "client_msg_id", | |
"type": "message", | |
"text": echo_guid, | |
"user": "userId", | |
"channel": self._slack_channel, | |
"channel_type": "im", | |
} | |
message = { | |
"token": self._slack_verification_token, | |
"team_id": "team_id", | |
"api_app_id": "apiAppId", | |
"event": slack_event, | |
"type": "event_callback", | |
} | |
return json.dumps(message) | |
def _create_hub_signature(self, message: str, timestamp: str) -> str: | |
signature = ["v0", timestamp, message] | |
base_string = ":".join(signature) | |
computed_signature = "V0=" + hmac.new( | |
bytes(self._slack_client_signing_secret, encoding="utf8"), | |
msg=bytes(base_string, "utf-8"), | |
digestmod=hashlib.sha256, | |
).hexdigest().upper().replace("-", "") | |
return computed_signature | |
def setUpClass(cls) -> None: | |
cls._slack_url_base: str = "https://slack.com/api" | |
cls._slack_channel = os.getenv("SlackChannel") | |
if not cls._slack_channel: | |
raise Exception('Environment variable "SlackChannel" not found.') | |
cls._slack_bot_token = os.getenv("SlackBotToken") | |
if not cls._slack_bot_token: | |
raise Exception('Environment variable "SlackBotToken" not found.') | |
cls._slack_client_signing_secret = os.getenv("SlackClientSigningSecret") | |
if not cls._slack_client_signing_secret: | |
raise Exception( | |
'Environment variable "SlackClientSigningSecret" not found.' | |
) | |
cls._slack_verification_token = os.getenv("SlackVerificationToken") | |
if not cls._slack_verification_token: | |
raise Exception('Environment variable "SlackVerificationToken" not found.') | |
cls._bot_name = os.getenv("BotName") | |
if not cls._bot_name: | |
raise Exception('Environment variable "BotName" not found.') | |