|
from time import sleep |
|
from typing import Optional |
|
|
|
import requests |
|
|
|
from internals.data.task import Task |
|
from internals.util.config import get_environment |
|
|
|
|
|
class Slack: |
|
def __init__(self): |
|
|
|
self.webhook_url = "https://hooks.slack.com/services/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" |
|
self.error_webhook = "https://hooks.slack.com/services/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" |
|
|
|
def send_alert(self, task: Task, args: Optional[dict]): |
|
raw = task.get_raw().copy() |
|
|
|
raw["environment"] = get_environment() |
|
raw.pop("queue_name", None) |
|
raw.pop("attempt", None) |
|
raw.pop("timestamp", None) |
|
raw.pop("task_id", None) |
|
raw.pop("maskImageUrl", None) |
|
raw.pop("aux_imageUrl", None) |
|
|
|
if args is not None: |
|
raw.update(args.items()) |
|
|
|
message = "" |
|
for key, value in raw.items(): |
|
if value: |
|
if type(value) == list: |
|
if type(value[0]) == float or type(value[0]) == int: |
|
value = str(value) |
|
message += f"*{key}*: {', '.join(value)}\n" |
|
else: |
|
message += f"*{key}*: {value}\n" |
|
|
|
requests.post( |
|
self.webhook_url, |
|
headers={"Content-Type": "application/json"}, |
|
json={"text": message}, |
|
) |
|
|
|
def error_alert(self, task: Task, e: Exception): |
|
requests.post( |
|
self.error_webhook, |
|
headers={"Content-Type": "application/json"}, |
|
json={ |
|
"text": "Task failed:\n{} \n error is: \n {}".format(task.get_raw(), e) |
|
}, |
|
) |
|
|
|
def auto_send_alert(self, func): |
|
def inner(*args, **kwargs): |
|
rargs = func(*args, **kwargs) |
|
task = Task({}) |
|
for arg in args: |
|
if type(arg) is Task: |
|
task = arg |
|
break |
|
self.send_alert(task, rargs) |
|
return rargs |
|
|
|
return inner |
|
|